fix: memory pool close issue

This commit is contained in:
dapan1121 2024-11-15 17:05:46 +08:00
parent 85fc4dfc2b
commit 15e26827db
3 changed files with 234 additions and 111 deletions

View File

@ -42,7 +42,7 @@ extern "C" {
#define MP_RETIRE_UNIT_MIN_SIZE (50 * 1048576UL) #define MP_RETIRE_UNIT_MIN_SIZE (50 * 1048576UL)
#define MP_CFG_UPDATE_MIN_RESERVE_SIZE (50 * 1024 * 1048576UL) #define MP_CFG_UPDATE_MIN_RESERVE_SIZE (50 * 1024 * 1048576UL)
#define MP_DEFAULT_RESERVE_MEM_PERCENT 20 #define MP_DEFAULT_RESERVE_MEM_PERCENT 1 // TODO 20 !!!!!!!!!!!!!!!!!!!!!
#define MP_MIN_FREE_SIZE_AFTER_RESERVE (4 * 1024 * 1048576UL) #define MP_MIN_FREE_SIZE_AFTER_RESERVE (4 * 1024 * 1048576UL)
#define MP_MIN_MEM_POOL_SIZE (5 * 1024 * 1048576UL) #define MP_MIN_MEM_POOL_SIZE (5 * 1024 * 1048576UL)

View File

@ -1020,7 +1020,7 @@ void mpUpdateSystemAvailableMemorySize() {
void* mpMgmtThreadFunc(void* param) { void* mpMgmtThreadFunc(void* param) {
int32_t timeout = 0; int32_t timeout = 0;
int64_t retireSize = 0; int64_t retireSize = 0;
SMemPool* pPool = (SMemPool*)gMemPoolHandle; SMemPool* pPool = (SMemPool*)atomic_load_ptr(&gMemPoolHandle);
while (0 == atomic_load_8(&gMPMgmt.modExit)) { while (0 == atomic_load_8(&gMPMgmt.modExit)) {
mpUpdateSystemAvailableMemorySize(); mpUpdateSystemAvailableMemorySize();
@ -1045,6 +1045,8 @@ void* mpMgmtThreadFunc(void* param) {
} }
*/ */
} }
taosMemPoolModDestroy();
return NULL; return NULL;
} }
@ -1070,7 +1072,9 @@ _return:
void mpModInit(void) { void mpModInit(void) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
gMPMgmt.modExit = 0;
taosInitRWLatch(&gMPMgmt.poolLock); taosInitRWLatch(&gMPMgmt.poolLock);
gMPMgmt.poolList = taosArrayInit(10, POINTER_BYTES); gMPMgmt.poolList = taosArrayInit(10, POINTER_BYTES);
@ -1080,11 +1084,11 @@ void mpModInit(void) {
gMPMgmt.strategy = E_MP_STRATEGY_DIRECT; gMPMgmt.strategy = E_MP_STRATEGY_DIRECT;
gMPMgmt.code = tsem2_init(&gMPMgmt.threadSem, 0, 0); //gMPMgmt.code = tsem2_init(&gMPMgmt.threadSem, 0, 0);
if (TSDB_CODE_SUCCESS != gMPMgmt.code) { //if (TSDB_CODE_SUCCESS != gMPMgmt.code) {
uError("failed to init sem2, error: 0x%x", gMPMgmt.code); // uError("failed to init sem2, error: 0x%x", gMPMgmt.code);
return; // return;
} //}
gMPMgmt.waitMs = MP_DEFAULT_MEM_CHK_INTERVAL_MS; gMPMgmt.waitMs = MP_DEFAULT_MEM_CHK_INTERVAL_MS;
@ -1093,6 +1097,13 @@ _return:
gMPMgmt.code = code; gMPMgmt.code = code;
} }
void mpFreePool(void* p) {
SMemPool* pPool = *(void**)p;
taosMemoryFree(pPool);
}
void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName) { void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName) {
SMemPool* pPool = (SMemPool*)poolHandle; SMemPool* pPool = (SMemPool*)poolHandle;
SMPSession* pSession = (SMPSession*)session; SMPSession* pSession = (SMPSession*)session;
@ -1129,7 +1140,9 @@ int32_t taosMemPoolOpen(char* poolName, SMemPoolCfg* cfg, void** poolHandle) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SMemPool* pPool = NULL; SMemPool* pPool = NULL;
MP_ERR_JRET(taosThreadOnce(&gMPoolInit, mpModInit)); //MP_ERR_JRET(taosThreadOnce(&gMPoolInit, mpModInit));
mpModInit();
if (TSDB_CODE_SUCCESS != gMPMgmt.code) { if (TSDB_CODE_SUCCESS != gMPMgmt.code) {
uError("init memory pool failed, code: 0x%x", gMPMgmt.code); uError("init memory pool failed, code: 0x%x", gMPMgmt.code);
MP_ERR_JRET(gMPMgmt.code); MP_ERR_JRET(gMPMgmt.code);
@ -1481,10 +1494,16 @@ void taosMemPoolClose(void* poolHandle) {
taosMemoryFree(pPool->name); taosMemoryFree(pPool->name);
mpDestroyCacheGroup(&pPool->sessionCache); mpDestroyCacheGroup(&pPool->sessionCache);
atomic_store_8(&gMPMgmt.modExit, 1);
} }
void taosMemPoolModDestroy(void) {
void taosMemPoolModDestroy(void) {
gMemPoolHandle = NULL;
taosArrayDestroyEx(gMPMgmt.poolList, mpFreePool);
gMPMgmt.poolList = NULL;
} }

View File

@ -42,7 +42,8 @@
namespace { namespace {
#define MPT_PRINTF (void)printf #define MPT_PRINTF(param, ...) (void)printf("[%" PRId64 ",%" PRId64 "] " param, mptCtx.caseLoop, mptCtx.jobLoop, __VA_ARGS__)
#define MPT_MAX_MEM_ACT_TIMES 300 #define MPT_MAX_MEM_ACT_TIMES 300
#define MPT_MAX_SESSION_NUM 100 #define MPT_MAX_SESSION_NUM 100
#define MPT_MAX_JOB_NUM 100 #define MPT_MAX_JOB_NUM 100
@ -244,8 +245,14 @@ typedef struct SMPTestCtx {
SHashObj* pJobs; SHashObj* pJobs;
BoundedQueue* pJobQueue; BoundedQueue* pJobQueue;
SMPTestThread threadCtxs[MPT_MAX_THREAD_NUM]; SMPTestThread threadCtxs[MPT_MAX_THREAD_NUM];
TdThread dropThreadFp;
int32_t jobNum;
SMPTestJobCtx* jobCtxs; SMPTestJobCtx* jobCtxs;
SMPTestParam param; SMPTestParam param;
int8_t testDone;
int64_t caseLoop;
int64_t jobLoop;
} SMPTestCtx; } SMPTestCtx;
SMPTestCtx mptCtx = {0}; SMPTestCtx mptCtx = {0};
@ -311,11 +318,11 @@ void mptInit() {
osDefaultInit(); osDefaultInit();
mptInitLogFile(); mptInitLogFile();
mptCtrl.caseLoopTimes = 1; mptCtrl.caseLoopTimes = 100000;
mptCtrl.taskActTimes = 0; mptCtrl.taskActTimes = 0;
mptCtrl.maxSingleAllocSize = 104857600; mptCtrl.maxSingleAllocSize = 104857600;
mptCtrl.jobNum = 100; mptCtrl.jobNum = 100;
mptCtrl.jobExecTimes = 1; mptCtrl.jobExecTimes = 10;
mptCtrl.jobTaskNum = 0; mptCtrl.jobTaskNum = 0;
mptCtx.pJobs = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); mptCtx.pJobs = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
@ -332,11 +339,51 @@ void mptInit() {
mptCtrl.pSrcString[mptCtrl.maxSingleAllocSize - 1] = 0; mptCtrl.pSrcString[mptCtrl.maxSingleAllocSize - 1] = 0;
} }
void mptDestroyTaskCtx(SMPTestTaskCtx* pTask, void* pSession) {
assert(gMemPoolHandle);
assert(pSession);
mptEnableMemoryPoolUsage(gMemPoolHandle, pSession); void mptDestroyJobInfo(SMPTJobInfo* pJob) {
taosMemFree(pJob->memInfo);
taosHashCleanup(pJob->pSessions);
}
void mptDestroySession(uint64_t qId, int64_t tId, int32_t eId, int32_t taskIdx, SMPTestJobCtx* pJobCtx, void* session) {
SMPTJobInfo *pJobInfo = pJobCtx->pJob;
char id[sizeof(tId) + sizeof(eId) + 1] = {0};
MPT_SET_TEID(id, tId, eId);
int32_t remainSessions = 0;
(void)taosHashRemove(pJobInfo->pSessions, id, sizeof(id));
taosMemPoolDestroySession(gMemPoolHandle, session, &remainSessions);
if (0 == remainSessions) {
MPT_LOCK(MPT_WRITE, &pJobInfo->lock);
if (0 == taosHashGetSize(pJobInfo->pSessions)) {
atomic_store_8(&pJobInfo->destroyed, 1);
uDebug("JOB:0x%x idx:%d destroyed, code:0x%x", pJobCtx->jobId, pJobCtx->jobIdx, pJobInfo->errCode);
mptDestroyJobInfo(pJobInfo);
MPT_UNLOCK(MPT_WRITE, &pJobInfo->lock);
pJobCtx->pJob = NULL;
(void)taosHashRemove(mptCtx.pJobs, &qId, sizeof(qId));
uInfo("the whole query job removed");
} else {
MPT_UNLOCK(MPT_WRITE, &pJobInfo->lock);
}
}
}
void mptDestroyTaskCtx(SMPTestJobCtx* pJobCtx, int32_t taskIdx) {
assert(gMemPoolHandle);
SMPTestTaskCtx* pTask = &pJobCtx->taskCtxs[taskIdx];
mptEnableMemoryPoolUsage(gMemPoolHandle, pJobCtx->pSessions[taskIdx]);
for (int32_t i = 0; i < pTask->memIdx; ++i) { for (int32_t i = 0; i < pTask->memIdx; ++i) {
pTask->stat.times.memFree.exec++; pTask->stat.times.memFree.exec++;
pTask->stat.bytes.memFree.exec+=mptMemorySize(pTask->pMemList[i].p); pTask->stat.bytes.memFree.exec+=mptMemorySize(pTask->pMemList[i].p);
@ -345,6 +392,9 @@ void mptDestroyTaskCtx(SMPTestTaskCtx* pTask, void* pSession) {
pTask->pMemList[i].p = NULL; pTask->pMemList[i].p = NULL;
} }
mptDisableMemoryPoolUsage(); mptDisableMemoryPoolUsage();
mptDestroySession(pJobCtx->jobId, pJobCtx->taskCtxs[taskIdx].taskId, 0, taskIdx, pJobCtx, pJobCtx->pSessions[taskIdx]);
pJobCtx->pSessions[taskIdx] = NULL;
for (int32_t i = 0; i < pTask->npMemIdx; ++i) { for (int32_t i = 0; i < pTask->npMemIdx; ++i) {
taosMemFreeClear(pTask->npMemList[i].p); taosMemFreeClear(pTask->npMemList[i].p);
@ -374,13 +424,6 @@ int32_t mptInitJobInfo(uint64_t qId, SMPTJobInfo* pJob) {
} }
void mptDestroyJobInfo(SMPTJobInfo* pJob) {
taosMemFree(pJob->memInfo);
taosHashCleanup(pJob->pSessions);
}
int32_t mptInitSession(uint64_t qId, uint64_t tId, int32_t eId, SMPTestJobCtx* pJobCtx, void** ppSession) { int32_t mptInitSession(uint64_t qId, uint64_t tId, int32_t eId, SMPTestJobCtx* pJobCtx, void** ppSession) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SMPTJobInfo* pJob = NULL; SMPTJobInfo* pJob = NULL;
@ -418,7 +461,7 @@ int32_t mptInitSession(uint64_t qId, uint64_t tId, int32_t eId, SMPTestJobCtx* p
pJobCtx->pJob = pJob; pJobCtx->pJob = pJob;
pJob->pCtx = pJobCtx; pJob->pCtx = pJobCtx;
char id[sizeof(tId) + sizeof(eId) + 0] = {0}; char id[sizeof(tId) + sizeof(eId) + 1] = {0};
MPT_SET_TEID(id, tId, eId); MPT_SET_TEID(id, tId, eId);
assert(0 == taosMemPoolInitSession(gMemPoolHandle, ppSession, pJob->memInfo, id)); assert(0 == taosMemPoolInitSession(gMemPoolHandle, ppSession, pJob->memInfo, id));
@ -466,37 +509,19 @@ void mptInitJob(int32_t idx) {
uDebug("JOB:0x%x idx:%d initialized, taskNum:%d", pJobCtx->jobId, idx, pJobCtx->taskNum); uDebug("JOB:0x%x idx:%d initialized, taskNum:%d", pJobCtx->jobId, idx, pJobCtx->taskNum);
} }
void mptDestroySession(uint64_t qId, int64_t tId, int32_t eId, int32_t taskIdx, SMPTestJobCtx* pJobCtx, void* session) {
SMPTJobInfo *pJobInfo = pJobCtx->pJob;
char id[sizeof(tId) + sizeof(eId) + 1] = {0};
MPT_SET_TEID(id, tId, eId);
int32_t remainSessions = 0;
(void)taosHashRemove(pJobInfo->pSessions, id, sizeof(id)); void mptDestroyTask(SMPTestJobCtx* pJobCtx, int32_t taskIdx) {
SMPStatDetail* pStat = NULL;
taosMemPoolDestroySession(gMemPoolHandle, session, &remainSessions); int64_t allocSize = 0;
taosMemPoolGetSessionStat(pJobCtx->pSessions[taskIdx], &pStat, &allocSize, NULL);
if (0 == remainSessions) { int64_t usedSize = MEMPOOL_GET_USED_SIZE(pStat);
MPT_LOCK(MPT_WRITE, &pJobInfo->lock);
if (0 == taosHashGetSize(pJobInfo->pSessions)) { assert(allocSize == usedSize);
atomic_store_8(&pJobInfo->destroyed, 1); assert(0 == memcmp(pStat, &pJobCtx->taskCtxs[taskIdx].stat, sizeof(*pStat)));
uDebug("JOB:0x%x idx:%d destroyed, code:0x%x", pJobCtx->jobId, pJobCtx->jobIdx, pJobInfo->errCode); mptDestroyTaskCtx(pJobCtx, taskIdx);
mptDestroyJobInfo(pJobInfo);
MPT_UNLOCK(MPT_WRITE, &pJobInfo->lock);
pJobCtx->pJob = NULL;
(void)taosHashRemove(mptCtx.pJobs, &qId, sizeof(qId));
uInfo("the whole query job removed");
} else {
MPT_UNLOCK(MPT_WRITE, &pJobInfo->lock);
}
}
} }
int32_t mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) { int32_t mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) {
if (taosWTryLockLatch(&pJobCtx->jobExecLock)) { if (taosWTryLockLatch(&pJobCtx->jobExecLock)) {
return -1; return -1;
@ -505,18 +530,7 @@ int32_t mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) {
uint64_t jobId = pJobCtx->jobId; uint64_t jobId = pJobCtx->jobId;
for (int32_t i = 0; i < pJobCtx->taskNum; ++i) { for (int32_t i = 0; i < pJobCtx->taskNum; ++i) {
if (!pJobCtx->taskCtxs[i].destoryed) { if (!pJobCtx->taskCtxs[i].destoryed) {
SMPStatDetail* pStat = NULL; mptDestroyTask(pJobCtx, i);
int64_t allocSize = 0;
taosMemPoolGetSessionStat(pJobCtx->pSessions[i], &pStat, &allocSize, NULL);
int64_t usedSize = MEMPOOL_GET_USED_SIZE(pStat);
assert(allocSize == usedSize);
assert(0 == memcmp(pStat, &pJobCtx->taskCtxs[i].stat, sizeof(*pStat)));
mptDestroySession(pJobCtx->jobId, pJobCtx->taskCtxs[i].taskId, 0, i, pJobCtx, pJobCtx->pSessions[i]);
pJobCtx->pSessions[i] = NULL;
mptDestroyTaskCtx(&pJobCtx->taskCtxs[i], pJobCtx->pSessions[i]);
} }
} }
@ -542,6 +556,10 @@ void mptCheckCompareJobInfo(SMPTestJobCtx* pJobCtx) {
} }
int32_t mptResetJob(SMPTestJobCtx* pJobCtx) { int32_t mptResetJob(SMPTestJobCtx* pJobCtx) {
if (NULL == pJobCtx->pJob) {
return -1;
}
if (atomic_load_8(&pJobCtx->pJob->retired)) { if (atomic_load_8(&pJobCtx->pJob->retired)) {
int32_t taskRunning = atomic_load_32(&pJobCtx->taskRunningNum); int32_t taskRunning = atomic_load_32(&pJobCtx->taskRunningNum);
if (0 == taskRunning) { if (0 == taskRunning) {
@ -557,8 +575,6 @@ int32_t mptResetJob(SMPTestJobCtx* pJobCtx) {
void mptRetireJob(SMPTJobInfo* pJob) { void mptRetireJob(SMPTJobInfo* pJob) {
SMPTestJobCtx* pCtx = (SMPTestJobCtx*)pJob->pCtx; SMPTestJobCtx* pCtx = (SMPTestJobCtx*)pJob->pCtx;
mptCheckCompareJobInfo(pCtx);
int32_t taskRunning = atomic_load_32(&pCtx->taskRunningNum); int32_t taskRunning = atomic_load_32(&pCtx->taskRunningNum);
if (0 == taskRunning) { if (0 == taskRunning) {
@ -1039,56 +1055,78 @@ void mptCheckPoolUsedSize(int32_t jobNum) {
int64_t usedSize = 0; int64_t usedSize = 0;
bool needEnd = false; bool needEnd = false;
int64_t poolUsedSize = 0; int64_t poolUsedSize = 0;
int32_t sleepTimes = 0;
taosMemPoolGetUsedSizeBegin(gMemPoolHandle, &usedSize, &needEnd);
while (true) {
for (int32_t i = 0; i < jobNum; ++i) { taosMemPoolGetUsedSizeBegin(gMemPoolHandle, &usedSize, &needEnd);
SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i];
if (NULL == pJobCtx->pJob) { poolUsedSize = 0;
for (int32_t i = 0; i < jobNum; ++i) {
SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i];
if (NULL == pJobCtx->pJob) {
continue;
}
sleepTimes = 0;
while (taosRTryLockLatch(&pJobCtx->jobExecLock)) {
taosUsleep(1);
sleepTimes++;
if (sleepTimes > 100) {
break;
}
}
if (sleepTimes > 100) {
break;
}
int64_t jobUsedSize = 0;
for (int32_t m = 0; m < pJobCtx->taskNum; ++m) {
if (!pJobCtx->taskCtxs[m].destoryed) {
SMPStatDetail* pStat = NULL;
int64_t allocSize = 0;
taosMemPoolGetSessionStat(pJobCtx->pSessions[m], &pStat, &allocSize, NULL);
int64_t usedSize = MEMPOOL_GET_USED_SIZE(pStat);
assert(allocSize == usedSize);
assert(0 == memcmp(pStat, &pJobCtx->taskCtxs[m].stat, sizeof(*pStat)));
jobUsedSize += allocSize;
}
}
assert(pJobCtx->pJob->memInfo->allocMemSize == jobUsedSize);
taosRUnLockLatch(&pJobCtx->jobExecLock);
poolUsedSize += jobUsedSize;
}
taosMemPoolGetUsedSizeEnd(gMemPoolHandle);
if (sleepTimes > 100) {
continue; continue;
} }
while (taosRTryLockLatch(&pJobCtx->jobExecLock)) { assert(poolUsedSize <= usedSize);
} break;
}
int64_t jobUsedSize = 0;
for (int32_t m = 0; m < pJobCtx->taskNum; ++m) {
if (!pJobCtx->taskCtxs[m].destoryed) {
SMPStatDetail* pStat = NULL;
int64_t allocSize = 0;
taosMemPoolGetSessionStat(pJobCtx->pSessions[m], &pStat, &allocSize, NULL);
int64_t usedSize = MEMPOOL_GET_USED_SIZE(pStat);
assert(allocSize == usedSize);
assert(0 == memcmp(pStat, &pJobCtx->taskCtxs[m].stat, sizeof(*pStat)));
jobUsedSize += allocSize;
}
}
assert(pJobCtx->pJob->memInfo->allocMemSize == jobUsedSize);
taosRUnLockLatch(&pJobCtx->jobExecLock);
poolUsedSize += jobUsedSize;
}
assert(poolUsedSize == usedSize);
taosMemPoolGetUsedSizeEnd(gMemPoolHandle);
} }
void* mptThreadFunc(void* param) { void* mptRunThreadFunc(void* param) {
SMPTestThread* pThread = (SMPTestThread*)param; SMPTestThread* pThread = (SMPTestThread*)param;
int32_t jobExecTimes = (mptCtrl.jobExecTimes) ? mptCtrl.jobExecTimes : taosRand() % MPT_MAX_JOB_LOOP_TIMES + 1; int32_t jobExecTimes = (mptCtrl.jobExecTimes) ? mptCtrl.jobExecTimes : taosRand() % MPT_MAX_JOB_LOOP_TIMES + 1;
int32_t jobNum = (mptCtrl.jobNum) ? mptCtrl.jobNum : MPT_MAX_JOB_NUM;
for (int32_t n = 0; n < jobExecTimes; ++n) { for (int32_t n = 0; n < jobExecTimes; ++n) {
MPT_PRINTF("Thread %d start the %d:%d job loops - jobNum:%d\n", pThread->idx, n, jobExecTimes, jobNum); mptCtx.jobNum = (mptCtrl.jobNum) ? mptCtrl.jobNum : (taosRand() % MPT_MAX_JOB_NUM + 1);
mptCtx.jobLoop = n;
for (int32_t i = 0; i < jobNum; ++i) { MPT_PRINTF("Thread %d start the %d:%d job loops - jobNum:%d\n", pThread->idx, n, jobExecTimes, mptCtx.jobNum);
for (int32_t i = 0; i < mptCtx.jobNum; ++i) {
SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i]; SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i];
MPT_PRINTF(" Thread %d start to run %d:%d job[%d:0x%" PRIx64 "]\n", pThread->idx, i, jobNum, pJobCtx->jobIdx, pJobCtx->jobId); MPT_PRINTF(" Thread %d start to run %d:%d job[%d:0x%" PRIx64 "]\n", pThread->idx, i, mptCtx.jobNum, pJobCtx->jobIdx, pJobCtx->jobId);
if (mptResetJob(pJobCtx)) { if (mptResetJob(pJobCtx)) {
continue; continue;
@ -1110,6 +1148,10 @@ void* mptThreadFunc(void* param) {
break; break;
} }
if (pJobCtx->taskCtxs[m].destoryed) {
continue;
}
int32_t actTimes = mptCtrl.taskActTimes ? mptCtrl.taskActTimes : ((taosRand() % 10) ? (taosRand() % (MPT_MAX_MEM_ACT_TIMES/10)) : (taosRand() % MPT_MAX_MEM_ACT_TIMES)); int32_t actTimes = mptCtrl.taskActTimes ? mptCtrl.taskActTimes : ((taosRand() % 10) ? (taosRand() % (MPT_MAX_MEM_ACT_TIMES/10)) : (taosRand() % MPT_MAX_MEM_ACT_TIMES));
MPT_PRINTF("Thread %d start to run %d:%d task\n", pThread->idx, m, pJobCtx->taskNum); MPT_PRINTF("Thread %d start to run %d:%d task\n", pThread->idx, m, pJobCtx->taskNum);
@ -1125,22 +1167,73 @@ void* mptThreadFunc(void* param) {
MPT_PRINTF("Thread %d finish the %dth job loops\n", pThread->idx, n); MPT_PRINTF("Thread %d finish the %dth job loops\n", pThread->idx, n);
mptCheckPoolUsedSize(jobNum); mptCheckPoolUsedSize(mptCtx.jobNum);
} }
return NULL; return NULL;
} }
void mptStartThreadTest(int32_t threadIdx) { void* mptDropThreadFunc(void* param) {
int32_t jobIdx = 0, taskIdx = 0, code = 0;
uint64_t taskId = 0;
while (!atomic_load_8(&mptCtx.testDone)) {
taosMsleep(200);
jobIdx = taosRand() % mptCtx.jobNum;
SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[jobIdx];
taosWLockLatch(&pJobCtx->jobExecLock);
if (NULL == pJobCtx->pJob || pJobCtx->pJob->destroyed) {
taosWUnLockLatch(&pJobCtx->jobExecLock);
continue;
}
if (taosRand() % 20) {
taskIdx = taosRand() % pJobCtx->taskNum;
if (pJobCtx->taskCtxs[taskIdx].destoryed) {
taosWUnLockLatch(&pJobCtx->jobExecLock);
continue;
}
taskId = pJobCtx->taskCtxs[taskIdx].taskId;
mptDestroyTask(pJobCtx, taskIdx);
MPT_PRINTF("Drop Thread destroy task %d:0x%" PRIx64 " in job %d:%" PRIx64 "\n", taskIdx, taskId, jobIdx, pJobCtx->jobId);
taosWUnLockLatch(&pJobCtx->jobExecLock);
} else {
taosWUnLockLatch(&pJobCtx->jobExecLock);
code = mptDestroyJob(pJobCtx, false);
if (0 == code) {
MPT_PRINTF("Drop Thread destroy job %d:%" PRIx64 "\n", jobIdx, pJobCtx->jobId);
}
}
}
return NULL;
}
void mptStartRunThread(int32_t threadIdx) {
TdThreadAttr thattr; TdThreadAttr thattr;
ASSERT_EQ(0, taosThreadAttrInit(&thattr)); ASSERT_EQ(0, taosThreadAttrInit(&thattr));
ASSERT_EQ(0, taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE)); ASSERT_EQ(0, taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE));
mptCtx.threadCtxs[threadIdx].idx = threadIdx; mptCtx.threadCtxs[threadIdx].idx = threadIdx;
ASSERT_EQ(0, taosThreadCreate(&mptCtx.threadCtxs[threadIdx].threadFp, &thattr, mptThreadFunc, &mptCtx.threadCtxs[threadIdx])); ASSERT_EQ(0, taosThreadCreate(&mptCtx.threadCtxs[threadIdx].threadFp, &thattr, mptRunThreadFunc, &mptCtx.threadCtxs[threadIdx]));
ASSERT_EQ(0, taosThreadAttrDestroy(&thattr)); ASSERT_EQ(0, taosThreadAttrDestroy(&thattr));
} }
void mptStartDropThread() {
TdThreadAttr thattr;
ASSERT_EQ(0, taosThreadAttrInit(&thattr));
ASSERT_EQ(0, taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE));
ASSERT_EQ(0, taosThreadCreate(&mptCtx.dropThreadFp, &thattr, mptDropThreadFunc, NULL));
ASSERT_EQ(0, taosThreadAttrDestroy(&thattr));
}
void mptDestroyJobs() { void mptDestroyJobs() {
int32_t jobNum = mptCtrl.jobNum ? mptCtrl.jobNum : MPT_MAX_JOB_NUM; int32_t jobNum = mptCtrl.jobNum ? mptCtrl.jobNum : MPT_MAX_JOB_NUM;
@ -1152,22 +1245,33 @@ void mptDestroyJobs() {
void mptRunCase(SMPTestParam* param, int32_t times) { void mptRunCase(SMPTestParam* param, int32_t times) {
MPT_PRINTF("\t case start the %dth running\n", times); MPT_PRINTF("\t case start the %dth running\n", times);
mptCtx.caseLoop = times;
memcpy(&mptCtx.param, param, sizeof(SMPTestParam)); memcpy(&mptCtx.param, param, sizeof(SMPTestParam));
atomic_store_8(&mptCtx.testDone, 0);
mptInitPool(); mptInitPool();
mptInitJobs(); mptInitJobs();
for (int32_t i = 0; i < mptCtx.param.threadNum; ++i) { for (int32_t i = 0; i < mptCtx.param.threadNum; ++i) {
mptStartThreadTest(i); mptStartRunThread(i);
} }
mptStartDropThread();
for (int32_t i = 0; i < mptCtx.param.threadNum; ++i) { for (int32_t i = 0; i < mptCtx.param.threadNum; ++i) {
(void)taosThreadJoin(mptCtx.threadCtxs[i].threadFp, NULL); (void)taosThreadJoin(mptCtx.threadCtxs[i].threadFp, NULL);
} }
atomic_store_8(&mptCtx.testDone, 1);
(void)taosThreadJoin(mptCtx.dropThreadFp, NULL);
mptDestroyJobs(); mptDestroyJobs();
taosMemPoolClose(gMemPoolHandle);
gMemPoolHandle = NULL;
MPT_PRINTF("\t case end the %dth running\n", times); MPT_PRINTF("\t case end the %dth running\n", times);
} }