fix: mempool ut issues

This commit is contained in:
dapan1121 2024-11-18 15:05:02 +08:00
parent ce2b2bb024
commit 9941dcae9b
6 changed files with 123 additions and 27 deletions

View File

@ -128,6 +128,8 @@ void taosMemPoolDestroySession(void* poolHandle, void* session, int32_t* rema
int32_t taosMemPoolCallocJob(uint64_t jobId, uint64_t cId, void** ppJob);
void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg);
void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName);
int32_t taosMemPoolTryLockPool(void* poolHandle, bool readLock);
void taosMemPoolUnLockPool(void* poolHandle, bool readLock);
void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* needEnd);
void taosMemPoolGetUsedSizeEnd(void* poolHandle);
int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t* allocSize, int64_t* maxAllocSize);

View File

@ -167,7 +167,7 @@ void schedulerFreeJob(int64_t *jobId, int32_t errCode) {
SSchJob *pJob = NULL;
(void)schAcquireJob(*jobId, &pJob);
if (NULL == pJob) {
qWarn("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *jobId);
qDebug("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *jobId);
return;
}

View File

@ -371,6 +371,9 @@ enum {
(_chunkNum)++; \
} while (0)
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
#define MP_LOCK(type, _lock) \
do { \
if (MP_READ == (type)) { \

View File

@ -24,7 +24,7 @@ int32_t mpDirectAlloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint
void* res = NULL;
int64_t nSize = *size;
taosRLockLatch(&pPool->cfgLock);
MP_LOCK(MP_READ, &pPool->cfgLock);
MP_ERR_JRET(mpChkQuotaOverflow(pPool, pSession, *size));
@ -46,7 +46,7 @@ int32_t mpDirectAlloc(SMemPool* pPool, SMPSession* pSession, int64_t* size, uint
_return:
taosRUnLockLatch(&pPool->cfgLock);
MP_UNLOCK(MP_READ, &pPool->cfgLock);
*ppRes = res;
*size = nSize;
@ -64,7 +64,7 @@ void mpDirectFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* ori
*origSize = oSize;
}
taosRLockLatch(&pPool->cfgLock); // tmp test
MP_LOCK(MP_READ, &pPool->cfgLock); // tmp test
taosMemFree(ptr);
@ -75,14 +75,14 @@ void mpDirectFree(SMemPool* pPool, SMPSession* pSession, void *ptr, int64_t* ori
(void)atomic_sub_fetch_64(&pPool->allocMemSize, oSize);
taosRUnLockLatch(&pPool->cfgLock);
MP_UNLOCK(MP_READ, &pPool->cfgLock);
}
int32_t mpDirectRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int64_t* size, int64_t* origSize) {
int32_t code = TSDB_CODE_SUCCESS;
int64_t nSize = *size;
taosRLockLatch(&pPool->cfgLock);
MP_LOCK(MP_READ, &pPool->cfgLock);
MP_ERR_JRET(mpChkQuotaOverflow(pPool, pSession, *size - *origSize));
@ -96,7 +96,7 @@ int32_t mpDirectRealloc(SMemPool* pPool, SMPSession* pSession, void **pPtr, int6
_return:
taosRUnLockLatch(&pPool->cfgLock);
MP_UNLOCK(MP_READ, &pPool->cfgLock);
if (code) {
mpDirectFree(pPool, pSession, *pPtr, origSize);

View File

@ -1545,6 +1545,34 @@ int32_t taosMemPoolCallocJob(uint64_t jobId, uint64_t cId, void** ppJob) {
return code;
}
int32_t taosMemPoolTryLockPool(void* poolHandle, bool readLock) {
if (NULL == poolHandle) {
return TSDB_CODE_INVALID_PARA;
}
SMemPool* pPool = (SMemPool*)poolHandle;
if (readLock) {
MP_LOCK(MP_READ, &pPool->cfgLock);
} else {
MP_LOCK(MP_WRITE, &pPool->cfgLock);
}
return TSDB_CODE_SUCCESS;
}
void taosMemPoolUnLockPool(void* poolHandle, bool readLock) {
if (NULL == poolHandle) {
return;
}
SMemPool* pPool = (SMemPool*)poolHandle;
if (readLock) {
MP_UNLOCK(MP_READ, &pPool->cfgLock);
} else {
MP_UNLOCK(MP_WRITE, &pPool->cfgLock);
}
}
void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* needEnd) {
if (NULL == poolHandle) {
*usedSize = 0;
@ -1554,7 +1582,6 @@ void taosMemPoolGetUsedSizeBegin(void* poolHandle, int64_t* usedSize, bool* need
SMemPool* pPool = (SMemPool*)poolHandle;
taosWLockLatch(&pPool->cfgLock);
*needEnd = true;
*usedSize = atomic_load_64(&pPool->allocMemSize);
}
@ -1565,7 +1592,7 @@ void taosMemPoolGetUsedSizeEnd(void* poolHandle) {
return;
}
taosWUnLockLatch(&pPool->cfgLock);
MP_UNLOCK(MP_WRITE, &pPool->cfgLock);
}
int32_t taosMemPoolGetSessionStat(void* session, SMPStatDetail** ppStat, int64_t* allocSize, int64_t* maxAllocSize) {

View File

@ -60,12 +60,43 @@ enum {
MPT_WRITE,
};
threadlocal void* mptThreadPoolHandle = NULL;
threadlocal void* mptThreadPoolSession = NULL;
#define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000
static int32_t MPT_TRY_LOCK(int32_t type, SRWLatch *_lock) {
int32_t code = 0;
do {
if (MPT_READ == (type)) {
if (atomic_load_32((_lock)) < 0) {
uError("invalid lock value before try read lock");
break;
}
uDebug("MPT TRY RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__);
code = taosRTryLockLatch(_lock);
uDebug("MPT TRY RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__);
if (atomic_load_32((_lock)) <= 0) {
uError("invalid lock value after try read lock");
break;
}
} else {
if (atomic_load_32((_lock)) < 0) {
uError("invalid lock value before try write lock");
break;
}
uDebug("MPT TRY WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__);
code = taosWTryLockLatch(_lock);
uDebug("MPT TRY WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__);
if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) {
uError("invalid lock value after try write lock");
break;
}
}
} while (0);
return code;
}
#define MPT_LOCK(type, _lock) \
do { \
if (MPT_READ == (type)) { \
@ -73,7 +104,9 @@ threadlocal void* mptThreadPoolSession = NULL;
uError("invalid lock value before read lock"); \
break; \
} \
uDebug("MPT RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRLockLatch(_lock); \
uDebug("MPT RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
if (atomic_load_32((_lock)) <= 0) { \
uError("invalid lock value after read lock"); \
break; \
@ -83,7 +116,9 @@ threadlocal void* mptThreadPoolSession = NULL;
uError("invalid lock value before write lock"); \
break; \
} \
uDebug("MPT WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWLockLatch(_lock); \
uDebug("MPT WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { \
uError("invalid lock value after write lock"); \
break; \
@ -98,7 +133,9 @@ threadlocal void* mptThreadPoolSession = NULL;
uError("invalid lock value before read unlock"); \
break; \
} \
uDebug("MPT RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosRUnLockLatch(_lock); \
uDebug("MPT RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
if (atomic_load_32((_lock)) < 0) { \
uError("invalid lock value after read unlock"); \
break; \
@ -108,7 +145,9 @@ threadlocal void* mptThreadPoolSession = NULL;
uError("invalid lock value before write unlock"); \
break; \
} \
uDebug("MPT WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
taosWUnLockLatch(_lock); \
uDebug("MPT WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \
if (atomic_load_32((_lock)) < 0) { \
uError("invalid lock value after write unlock"); \
break; \
@ -117,6 +156,12 @@ threadlocal void* mptThreadPoolSession = NULL;
} while (0)
threadlocal void* mptThreadPoolHandle = NULL;
threadlocal void* mptThreadPoolSession = NULL;
int32_t tsSingleQueryMaxMemorySize = 0; //MB
#define MPT_SET_TEID(id, tId, eId) \
do { \
*(uint64_t *)(id) = (tId); \
@ -227,7 +272,7 @@ typedef struct {
int32_t jobQuota;
bool reserveMode;
int64_t upperLimitSize;
int32_t reserveSize;
int32_t reserveSize; //MB
int32_t threadNum;
int32_t randTask;
} SMPTestParam;
@ -523,7 +568,7 @@ void mptDestroyTask(SMPTestJobCtx* pJobCtx, int32_t taskIdx) {
}
int32_t mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) {
if (taosWTryLockLatch(&pJobCtx->jobExecLock)) {
if (MPT_TRY_LOCK(MPT_WRITE, &pJobCtx->jobExecLock)) {
return -1;
}
@ -544,7 +589,7 @@ int32_t mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) {
mptInitJob(jobIdx);
}
taosWUnLockLatch(&pJobCtx->jobExecLock);
MPT_UNLOCK(MPT_WRITE, &pJobCtx->jobExecLock);
MPT_PRINTF(" JOB:0x%x retired\n", jobId);
@ -1058,6 +1103,11 @@ void mptCheckPoolUsedSize(int32_t jobNum) {
int32_t sleepTimes = 0;
while (true) {
if (taosMemPoolTryLockPool(gMemPoolHandle, false)) {
taosUsleep(1);
continue;
}
taosMemPoolGetUsedSizeBegin(gMemPoolHandle, &usedSize, &needEnd);
poolUsedSize = 0;
@ -1066,7 +1116,7 @@ void mptCheckPoolUsedSize(int32_t jobNum) {
SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i];
sleepTimes = 0;
while (taosRTryLockLatch(&pJobCtx->jobExecLock)) {
while (MPT_TRY_LOCK(MPT_READ, &pJobCtx->jobExecLock)) {
taosUsleep(1);
sleepTimes++;
if (sleepTimes > 100) {
@ -1075,10 +1125,12 @@ void mptCheckPoolUsedSize(int32_t jobNum) {
}
if (sleepTimes > 100) {
MPT_UNLOCK(MPT_READ, &pJobCtx->jobExecLock);
break;
}
if (NULL == pJobCtx->pJob) {
MPT_UNLOCK(MPT_READ, &pJobCtx->jobExecLock);
continue;
}
@ -1099,7 +1151,7 @@ void mptCheckPoolUsedSize(int32_t jobNum) {
assert(pJobCtx->pJob->memInfo->allocMemSize == jobUsedSize);
taosRUnLockLatch(&pJobCtx->jobExecLock);
MPT_UNLOCK(MPT_READ, &pJobCtx->jobExecLock);
poolUsedSize += jobUsedSize;
}
@ -1133,7 +1185,7 @@ void* mptRunThreadFunc(void* param) {
continue;
}
if (taosRTryLockLatch(&pJobCtx->jobExecLock)) {
if (MPT_TRY_LOCK(MPT_READ, &pJobCtx->jobExecLock)) {
continue;
}
@ -1141,6 +1193,7 @@ void* mptRunThreadFunc(void* param) {
int32_t actTimes = mptCtrl.taskActTimes ? mptCtrl.taskActTimes : ((taosRand() % 10) ? (taosRand() % (MPT_MAX_MEM_ACT_TIMES/10)) : (taosRand() % MPT_MAX_MEM_ACT_TIMES));
int32_t taskIdx = taosRand() % pJobCtx->taskNum;
mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[taskIdx], taskIdx, actTimes);
MPT_UNLOCK(MPT_READ, &pJobCtx->jobExecLock);
continue;
}
@ -1159,7 +1212,7 @@ void* mptRunThreadFunc(void* param) {
mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[m], m, actTimes);
}
taosRUnLockLatch(&pJobCtx->jobExecLock);
MPT_UNLOCK(MPT_READ, &pJobCtx->jobExecLock);
MPT_PRINTF(" Thread %d end %dth JOB 0x%x exec, retired:%d\n", pThread->idx, pJobCtx->jobIdx, pJobCtx->jobId, pJobCtx->pJob->retired);
@ -1181,18 +1234,24 @@ void* mptDropThreadFunc(void* param) {
while (!atomic_load_8(&mptCtx.testDone)) {
taosMsleep(200);
if (taosMemPoolTryLockPool(gMemPoolHandle, true)) {
continue;
}
jobIdx = taosRand() % mptCtx.jobNum;
SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[jobIdx];
taosWLockLatch(&pJobCtx->jobExecLock);
MPT_LOCK(MPT_WRITE, &pJobCtx->jobExecLock);
if (NULL == pJobCtx->pJob || pJobCtx->pJob->destroyed) {
taosWUnLockLatch(&pJobCtx->jobExecLock);
MPT_UNLOCK(MPT_WRITE, &pJobCtx->jobExecLock);
taosMemPoolUnLockPool(gMemPoolHandle, true);
continue;
}
if (taosRand() % 20) {
taskIdx = taosRand() % pJobCtx->taskNum;
if (pJobCtx->taskCtxs[taskIdx].destoryed) {
taosWUnLockLatch(&pJobCtx->jobExecLock);
MPT_UNLOCK(MPT_WRITE, &pJobCtx->jobExecLock);
taosMemPoolUnLockPool(gMemPoolHandle, true);
continue;
}
@ -1200,14 +1259,16 @@ void* mptDropThreadFunc(void* param) {
mptDestroyTask(pJobCtx, taskIdx);
MPT_PRINTF("Drop Thread destroy task %d:0x%" PRIx64 " in job %d:%" PRIx64 "\n", taskIdx, taskId, jobIdx, pJobCtx->jobId);
taosWUnLockLatch(&pJobCtx->jobExecLock);
MPT_UNLOCK(MPT_WRITE, &pJobCtx->jobExecLock);
} else {
taosWUnLockLatch(&pJobCtx->jobExecLock);
MPT_UNLOCK(MPT_WRITE, &pJobCtx->jobExecLock);
code = mptDestroyJob(pJobCtx, false);
if (0 == code) {
MPT_PRINTF("Drop Thread destroy job %d:%" PRIx64 "\n", jobIdx, pJobCtx->jobId);
}
}
taosMemPoolUnLockPool(gMemPoolHandle, true);
}
return NULL;
@ -1248,6 +1309,8 @@ void mptRunCase(SMPTestParam* param, int32_t times) {
mptCtx.caseLoop = times;
memcpy(&mptCtx.param, param, sizeof(SMPTestParam));
tsSingleQueryMaxMemorySize = param->jobQuota;
atomic_store_8(&mptCtx.testDone, 0);
mptInitPool();
@ -1281,9 +1344,9 @@ void mptPrintTestBeginInfo(char* caseName, SMPTestParam* param) {
MPT_PRINTF("\t case loop times: %d\n", mptCtrl.caseLoopTimes);
MPT_PRINTF("\t task max act times: %d\n", mptCtrl.taskActTimes ? mptCtrl.taskActTimes : MPT_MAX_MEM_ACT_TIMES);
MPT_PRINTF("\t max single alloc size: %" PRId64 "\n", mptCtrl.maxSingleAllocSize);
MPT_PRINTF("\t job quota size: %" PRId64 "\n", param->jobQuota);
MPT_PRINTF("\t job quota size: %dMB\n", param->jobQuota);
MPT_PRINTF("\t reserve mode: %d\n", param->reserveMode);
//MPT_PRINTF("\t upper limit size: %" PRId64 "\n", param->upperLimitSize);
MPT_PRINTF("\t reserve size: %dMB\n", param->reserveSize);
MPT_PRINTF("\t test thread num: %d\n", param->threadNum);
MPT_PRINTF("\t random exec task: %d\n", param->randTask);
}
@ -1325,6 +1388,7 @@ TEST(FuncTest, SingleThreadTest) {
SMPTestParam param = {0};
param.reserveMode = true;
param.threadNum = 1;
param.jobQuota = 1024;
mptPrintTestBeginInfo(caseName, &param);