diff --git a/include/os/osMemPool.h b/include/os/osMemPool.h index 9b78f9de37..315c92bdba 100644 --- a/include/os/osMemPool.h +++ b/include/os/osMemPool.h @@ -128,6 +128,8 @@ void taosMemPoolDestroySession(void* poolHandle, void* session, int32_t* rema int32_t taosMemPoolCallocJob(uint64_t jobId, uint64_t cId, void** ppJob); void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg); void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName); +int32_t taosMemPoolTryLockPool(void* poolHandle, bool readLock); +void taosMemPoolUnLockPool(void* poolHandle, bool readLock); void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* needEnd); void taosMemPoolGetUsedSizeEnd(void* poolHandle); int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t* allocSize, int64_t* maxAllocSize); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 21aaa6cef5..21659d9cd5 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -167,7 +167,7 @@ void schedulerFreeJob(int64_t *jobId, int32_t errCode) { SSchJob *pJob = NULL; (void)schAcquireJob(*jobId, &pJob); if (NULL == pJob) { - qWarn("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *jobId); + qDebug("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *jobId); return; } diff --git a/source/util/inc/tmempoolInt.h b/source/util/inc/tmempoolInt.h index 32973f9848..421c4e213e 100755 --- a/source/util/inc/tmempoolInt.h +++ b/source/util/inc/tmempoolInt.h @@ -371,6 +371,9 @@ enum { (_chunkNum)++; \ } while (0) +#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000 + + #define MP_LOCK(type, _lock) \ do { \ if (MP_READ == (type)) { \ diff --git a/source/util/src/mpDirect.c b/source/util/src/mpDirect.c index 2de4d71c56..a08ceab8f4 100755 --- a/source/util/src/mpDirect.c +++ b/source/util/src/mpDirect.c @@ -24,7 +24,7 @@ int32_t mpDirectAlloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint void* res = NULL; int64_t nSize = *size; - taosRLockLatch(&pPool->cfgLock); + MP_LOCK(MP_READ, &pPool->cfgLock); MP_ERR_JRET(mpChkQuotaOverflow(pPool, pSession, *size)); @@ -46,7 +46,7 @@ int32_t mpDirectAlloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint _return: - taosRUnLockLatch(&pPool->cfgLock); + MP_UNLOCK(MP_READ, &pPool->cfgLock); *ppRes = res; *size = nSize; @@ -64,7 +64,7 @@ void mpDirectFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* ori *origSize = oSize; } - taosRLockLatch(&pPool->cfgLock); // tmp test + MP_LOCK(MP_READ, &pPool->cfgLock); // tmp test taosMemFree(ptr); @@ -75,14 +75,14 @@ void mpDirectFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* ori (void)atomic_sub_fetch_64(&pPool->allocMemSize, oSize); - taosRUnLockLatch(&pPool->cfgLock); + MP_UNLOCK(MP_READ, &pPool->cfgLock); } int32_t mpDirectRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t* size, int64_t* origSize) { int32_t code = TSDB_CODE_SUCCESS; int64_t nSize = *size; - taosRLockLatch(&pPool->cfgLock); + MP_LOCK(MP_READ, &pPool->cfgLock); MP_ERR_JRET(mpChkQuotaOverflow(pPool, pSession, *size - *origSize)); @@ -96,7 +96,7 @@ int32_t mpDirectRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int6 _return: - taosRUnLockLatch(&pPool->cfgLock); + MP_UNLOCK(MP_READ, &pPool->cfgLock); if (code) { mpDirectFree(pPool, pSession, *pPtr, origSize); diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index 83dc916d49..8cd6a11598 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -1545,6 +1545,34 @@ int32_t taosMemPoolCallocJob(uint64_t jobId, uint64_t cId, void** ppJob) { return code; } +int32_t taosMemPoolTryLockPool(void* poolHandle, bool readLock) { + if (NULL == poolHandle) { + return TSDB_CODE_INVALID_PARA; + } + + SMemPool* pPool = (SMemPool*)poolHandle; + if (readLock) { + MP_LOCK(MP_READ, &pPool->cfgLock); + } else { + MP_LOCK(MP_WRITE, &pPool->cfgLock); + } + + return TSDB_CODE_SUCCESS; +} + +void taosMemPoolUnLockPool(void* poolHandle, bool readLock) { + if (NULL == poolHandle) { + return; + } + + SMemPool* pPool = (SMemPool*)poolHandle; + if (readLock) { + MP_UNLOCK(MP_READ, &pPool->cfgLock); + } else { + MP_UNLOCK(MP_WRITE, &pPool->cfgLock); + } +} + void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* needEnd) { if (NULL == poolHandle) { *usedSize = 0; @@ -1554,7 +1582,6 @@ void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* need SMemPool* pPool = (SMemPool*)poolHandle; - taosWLockLatch(&pPool->cfgLock); *needEnd = true; *usedSize = atomic_load_64(&pPool->allocMemSize); } @@ -1565,7 +1592,7 @@ void taosMemPoolGetUsedSizeEnd(void* poolHandle) { return; } - taosWUnLockLatch(&pPool->cfgLock); + MP_UNLOCK(MP_WRITE, &pPool->cfgLock); } int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t* allocSize, int64_t* maxAllocSize) { diff --git a/source/util/test/memPoolTest.cpp b/source/util/test/memPoolTest.cpp index 1fecb10b47..687f3e595b 100644 --- a/source/util/test/memPoolTest.cpp +++ b/source/util/test/memPoolTest.cpp @@ -60,12 +60,43 @@ enum { MPT_WRITE, }; - -threadlocal void* mptThreadPoolHandle = NULL; -threadlocal void* mptThreadPoolSession = NULL; - #define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000 + +static int32_t MPT_TRY_LOCK(int32_t type, SRWLatch *_lock) { + int32_t code = 0; + + do { + if (MPT_READ == (type)) { + if (atomic_load_32((_lock)) < 0) { + uError("invalid lock value before try read lock"); + break; + } + uDebug("MPT TRY RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); + code = taosRTryLockLatch(_lock); + uDebug("MPT TRY RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); + if (atomic_load_32((_lock)) <= 0) { + uError("invalid lock value after try read lock"); + break; + } + } else { + if (atomic_load_32((_lock)) < 0) { + uError("invalid lock value before try write lock"); + break; + } + uDebug("MPT TRY WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); + code = taosWTryLockLatch(_lock); + uDebug("MPT TRY WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); + if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { + uError("invalid lock value after try write lock"); + break; + } + } + } while (0); + + return code; +} + #define MPT_LOCK(type, _lock) \ do { \ if (MPT_READ == (type)) { \ @@ -73,7 +104,9 @@ threadlocal void* mptThreadPoolSession = NULL; uError("invalid lock value before read lock"); \ break; \ } \ + uDebug("MPT RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosRLockLatch(_lock); \ + uDebug("MPT RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ if (atomic_load_32((_lock)) <= 0) { \ uError("invalid lock value after read lock"); \ break; \ @@ -83,7 +116,9 @@ threadlocal void* mptThreadPoolSession = NULL; uError("invalid lock value before write lock"); \ break; \ } \ + uDebug("MPT WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosWLockLatch(_lock); \ + uDebug("MPT WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { \ uError("invalid lock value after write lock"); \ break; \ @@ -98,7 +133,9 @@ threadlocal void* mptThreadPoolSession = NULL; uError("invalid lock value before read unlock"); \ break; \ } \ + uDebug("MPT RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosRUnLockLatch(_lock); \ + uDebug("MPT RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ if (atomic_load_32((_lock)) < 0) { \ uError("invalid lock value after read unlock"); \ break; \ @@ -108,7 +145,9 @@ threadlocal void* mptThreadPoolSession = NULL; uError("invalid lock value before write unlock"); \ break; \ } \ + uDebug("MPT WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ taosWUnLockLatch(_lock); \ + uDebug("MPT WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ if (atomic_load_32((_lock)) < 0) { \ uError("invalid lock value after write unlock"); \ break; \ @@ -117,6 +156,12 @@ threadlocal void* mptThreadPoolSession = NULL; } while (0) + +threadlocal void* mptThreadPoolHandle = NULL; +threadlocal void* mptThreadPoolSession = NULL; +int32_t tsSingleQueryMaxMemorySize = 0; //MB + + #define MPT_SET_TEID(id, tId, eId) \ do { \ *(uint64_t *)(id) = (tId); \ @@ -227,7 +272,7 @@ typedef struct { int32_t jobQuota; bool reserveMode; int64_t upperLimitSize; - int32_t reserveSize; + int32_t reserveSize; //MB int32_t threadNum; int32_t randTask; } SMPTestParam; @@ -523,7 +568,7 @@ void mptDestroyTask(SMPTestJobCtx* pJobCtx, int32_t taskIdx) { } int32_t mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) { - if (taosWTryLockLatch(&pJobCtx->jobExecLock)) { + if (MPT_TRY_LOCK(MPT_WRITE, &pJobCtx->jobExecLock)) { return -1; } @@ -544,7 +589,7 @@ int32_t mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) { mptInitJob(jobIdx); } - taosWUnLockLatch(&pJobCtx->jobExecLock); + MPT_UNLOCK(MPT_WRITE, &pJobCtx->jobExecLock); MPT_PRINTF(" JOB:0x%x retired\n", jobId); @@ -1058,6 +1103,11 @@ void mptCheckPoolUsedSize(int32_t jobNum) { int32_t sleepTimes = 0; while (true) { + if (taosMemPoolTryLockPool(gMemPoolHandle, false)) { + taosUsleep(1); + continue; + } + taosMemPoolGetUsedSizeBegin(gMemPoolHandle, &usedSize, &needEnd); poolUsedSize = 0; @@ -1066,7 +1116,7 @@ void mptCheckPoolUsedSize(int32_t jobNum) { SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i]; sleepTimes = 0; - while (taosRTryLockLatch(&pJobCtx->jobExecLock)) { + while (MPT_TRY_LOCK(MPT_READ, &pJobCtx->jobExecLock)) { taosUsleep(1); sleepTimes++; if (sleepTimes > 100) { @@ -1075,10 +1125,12 @@ void mptCheckPoolUsedSize(int32_t jobNum) { } if (sleepTimes > 100) { + MPT_UNLOCK(MPT_READ, &pJobCtx->jobExecLock); break; } if (NULL == pJobCtx->pJob) { + MPT_UNLOCK(MPT_READ, &pJobCtx->jobExecLock); continue; } @@ -1099,7 +1151,7 @@ void mptCheckPoolUsedSize(int32_t jobNum) { assert(pJobCtx->pJob->memInfo->allocMemSize == jobUsedSize); - taosRUnLockLatch(&pJobCtx->jobExecLock); + MPT_UNLOCK(MPT_READ, &pJobCtx->jobExecLock); poolUsedSize += jobUsedSize; } @@ -1133,7 +1185,7 @@ void* mptRunThreadFunc(void* param) { continue; } - if (taosRTryLockLatch(&pJobCtx->jobExecLock)) { + if (MPT_TRY_LOCK(MPT_READ, &pJobCtx->jobExecLock)) { continue; } @@ -1141,6 +1193,7 @@ void* mptRunThreadFunc(void* param) { int32_t actTimes = mptCtrl.taskActTimes ? mptCtrl.taskActTimes : ((taosRand() % 10) ? (taosRand() % (MPT_MAX_MEM_ACT_TIMES/10)) : (taosRand() % MPT_MAX_MEM_ACT_TIMES)); int32_t taskIdx = taosRand() % pJobCtx->taskNum; mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[taskIdx], taskIdx, actTimes); + MPT_UNLOCK(MPT_READ, &pJobCtx->jobExecLock); continue; } @@ -1159,7 +1212,7 @@ void* mptRunThreadFunc(void* param) { mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[m], m, actTimes); } - taosRUnLockLatch(&pJobCtx->jobExecLock); + MPT_UNLOCK(MPT_READ, &pJobCtx->jobExecLock); MPT_PRINTF(" Thread %d end %dth JOB 0x%x exec, retired:%d\n", pThread->idx, pJobCtx->jobIdx, pJobCtx->jobId, pJobCtx->pJob->retired); @@ -1181,18 +1234,24 @@ void* mptDropThreadFunc(void* param) { while (!atomic_load_8(&mptCtx.testDone)) { taosMsleep(200); + if (taosMemPoolTryLockPool(gMemPoolHandle, true)) { + continue; + } + jobIdx = taosRand() % mptCtx.jobNum; SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[jobIdx]; - taosWLockLatch(&pJobCtx->jobExecLock); + MPT_LOCK(MPT_WRITE, &pJobCtx->jobExecLock); if (NULL == pJobCtx->pJob || pJobCtx->pJob->destroyed) { - taosWUnLockLatch(&pJobCtx->jobExecLock); + MPT_UNLOCK(MPT_WRITE, &pJobCtx->jobExecLock); + taosMemPoolUnLockPool(gMemPoolHandle, true); continue; } if (taosRand() % 20) { taskIdx = taosRand() % pJobCtx->taskNum; if (pJobCtx->taskCtxs[taskIdx].destoryed) { - taosWUnLockLatch(&pJobCtx->jobExecLock); + MPT_UNLOCK(MPT_WRITE, &pJobCtx->jobExecLock); + taosMemPoolUnLockPool(gMemPoolHandle, true); continue; } @@ -1200,14 +1259,16 @@ void* mptDropThreadFunc(void* param) { mptDestroyTask(pJobCtx, taskIdx); MPT_PRINTF("Drop Thread destroy task %d:0x%" PRIx64 " in job %d:%" PRIx64 "\n", taskIdx, taskId, jobIdx, pJobCtx->jobId); - taosWUnLockLatch(&pJobCtx->jobExecLock); + MPT_UNLOCK(MPT_WRITE, &pJobCtx->jobExecLock); } else { - taosWUnLockLatch(&pJobCtx->jobExecLock); + MPT_UNLOCK(MPT_WRITE, &pJobCtx->jobExecLock); code = mptDestroyJob(pJobCtx, false); if (0 == code) { MPT_PRINTF("Drop Thread destroy job %d:%" PRIx64 "\n", jobIdx, pJobCtx->jobId); } } + + taosMemPoolUnLockPool(gMemPoolHandle, true); } return NULL; @@ -1248,6 +1309,8 @@ void mptRunCase(SMPTestParam* param, int32_t times) { mptCtx.caseLoop = times; memcpy(&mptCtx.param, param, sizeof(SMPTestParam)); + tsSingleQueryMaxMemorySize = param->jobQuota; + atomic_store_8(&mptCtx.testDone, 0); mptInitPool(); @@ -1281,9 +1344,9 @@ void mptPrintTestBeginInfo(char* caseName, SMPTestParam* param) { MPT_PRINTF("\t case loop times: %d\n", mptCtrl.caseLoopTimes); MPT_PRINTF("\t task max act times: %d\n", mptCtrl.taskActTimes ? mptCtrl.taskActTimes : MPT_MAX_MEM_ACT_TIMES); MPT_PRINTF("\t max single alloc size: %" PRId64 "\n", mptCtrl.maxSingleAllocSize); - MPT_PRINTF("\t job quota size: %" PRId64 "\n", param->jobQuota); + MPT_PRINTF("\t job quota size: %dMB\n", param->jobQuota); MPT_PRINTF("\t reserve mode: %d\n", param->reserveMode); - //MPT_PRINTF("\t upper limit size: %" PRId64 "\n", param->upperLimitSize); + MPT_PRINTF("\t reserve size: %dMB\n", param->reserveSize); MPT_PRINTF("\t test thread num: %d\n", param->threadNum); MPT_PRINTF("\t random exec task: %d\n", param->randTask); } @@ -1325,6 +1388,7 @@ TEST(FuncTest, SingleThreadTest) { SMPTestParam param = {0}; param.reserveMode = true; param.threadNum = 1; + param.jobQuota = 1024; mptPrintTestBeginInfo(caseName, ¶m);