From e2accc307c49e343810b773551cf0249aad40ad7 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Thu, 21 Nov 2024 10:22:05 +0800 Subject: [PATCH] fix: rpc dead loop issue --- cmake/libuv_CMakeLists.txt.in | 2 +- include/os/osMemPool.h | 5 +- source/libs/qworker/inc/qwInt.h | 2 +- source/libs/qworker/src/qwMem.c | 29 +- source/libs/qworker/src/qwUtil.c | 15 +- source/libs/qworker/src/qworker.c | 18 +- source/libs/transport/src/transCli.c | 3 + source/util/inc/tmempoolInt.h | 84 ++-- source/util/src/tmempool.c | 35 +- source/util/test/memPoolTest.cpp | 575 +++++++++++++++++---------- 10 files changed, 480 insertions(+), 288 deletions(-) diff --git a/cmake/libuv_CMakeLists.txt.in b/cmake/libuv_CMakeLists.txt.in index 3bfb52fe9b..44a15f5c8a 100644 --- a/cmake/libuv_CMakeLists.txt.in +++ b/cmake/libuv_CMakeLists.txt.in @@ -2,7 +2,7 @@ # libuv ExternalProject_Add(libuv GIT_REPOSITORY https://github.com/libuv/libuv.git - GIT_TAG v1.48.0 + GIT_TAG v1.49.0 SOURCE_DIR "${TD_CONTRIB_DIR}/libuv" BINARY_DIR "${TD_CONTRIB_DIR}/libuv" CONFIGURE_COMMAND "" diff --git a/include/os/osMemPool.h b/include/os/osMemPool.h index 315c92bdba..126660ab61 100644 --- a/include/os/osMemPool.h +++ b/include/os/osMemPool.h @@ -34,6 +34,9 @@ typedef enum MemPoolEvictPolicy { typedef struct SMemPoolJob { uint64_t jobId; uint64_t clientId; + + int32_t remainSession; + int64_t allocMemSize; int64_t maxAllocMemSize; } SMemPoolJob; @@ -124,7 +127,7 @@ void taosMemPoolClose(void* poolHandle); void taosMemPoolModDestroy(void); void taosAutoMemoryFree(void *ptr); int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob, char *sessionId); -void taosMemPoolDestroySession(void* poolHandle, void* session, int32_t* remainSessions); +void taosMemPoolDestroySession(void* poolHandle, void* session); int32_t taosMemPoolCallocJob(uint64_t jobId, uint64_t cId, void** ppJob); void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg); void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName); diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 822ff8eb29..77a99e8fa3 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -529,7 +529,7 @@ void qwDbgSimulateSleep(void); void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped); int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx); int32_t qwInitQueryPool(void); -void qwDestroyJobInfo(SQWJobInfo* pJob); +void qwDestroyJobInfo(void* job); void qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx); void qwRetireJob(SQWJobInfo* pJob); void qwDestroySession(QW_FPARAMS_DEF, SQWJobInfo *pJobInfo, void* session); diff --git a/source/libs/qworker/src/qwMem.c b/source/libs/qworker/src/qwMem.c index 59577f380b..3a4a9fc23d 100644 --- a/source/libs/qworker/src/qwMem.c +++ b/source/libs/qworker/src/qwMem.c @@ -65,7 +65,10 @@ int32_t qwInitJobHash(void) { if (NULL != atomic_val_compare_exchange_ptr(&gQueryMgmt.pJobInfo, NULL, pHash)) { taosHashCleanup(pHash); + return code; } + + taosHashSetFreeFp(gQueryMgmt.pJobInfo, qwDestroyJobInfo); } return code; @@ -75,19 +78,18 @@ int32_t qwInitJobHash(void) { void qwDestroySession(QW_FPARAMS_DEF, SQWJobInfo *pJobInfo, void* session) { char id[sizeof(tId) + sizeof(eId) + 1] = {0}; QW_SET_TEID(id, tId, eId); - int32_t remainSessions = 0; + int32_t remainSessions = atomic_sub_fetch_32(&pJobInfo->memInfo->remainSession, 1); (void)taosHashRemove(pJobInfo->pSessions, id, sizeof(id)); - taosMemPoolDestroySession(gMemPoolHandle, session, &remainSessions); + taosMemPoolDestroySession(gMemPoolHandle, session); if (0 == remainSessions) { QW_LOCK(QW_WRITE, &pJobInfo->lock); - if (0 == taosHashGetSize(pJobInfo->pSessions)) { + if (0 == taosHashGetSize(pJobInfo->pSessions) && 0 == atomic_load_32(&pJobInfo->memInfo->remainSession)) { atomic_store_8(&pJobInfo->destroyed, 1); - qwDestroyJobInfo(pJobInfo); QW_UNLOCK(QW_WRITE, &pJobInfo->lock); - + char id2[sizeof(qId) + sizeof(cId) + 1] = {0}; QW_SET_QCID(id2, qId, cId); (void)taosHashRemove(gQueryMgmt.pJobInfo, id2, sizeof(id2)); @@ -137,10 +139,15 @@ int32_t qwRetrieveJobInfo(QW_FPARAMS_DEF, SQWJobInfo** ppJob) { } } + QW_LOCK(QW_READ, &pJob->lock); if (atomic_load_8(&pJob->destroyed)) { + QW_UNLOCK(QW_READ, &pJob->lock); continue; } + atomic_add_fetch_32(&pJob->memInfo->remainSession, 1); + QW_UNLOCK(QW_READ, &pJob->lock); + break; } @@ -172,19 +179,11 @@ int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession) { QW_ERR_JRET(taosMemPoolInitSession(gMemPoolHandle, ppSession, pJob->memInfo, id)); session.sessionMp = *ppSession; - QW_LOCK(QW_READ, &pJob->lock); - if (atomic_load_8(&pJob->destroyed)) { - QW_UNLOCK(QW_READ, &pJob->lock); - continue; - } - code = taosHashPut(pJob->pSessions, id, sizeof(id), &session, sizeof(session)); if (TSDB_CODE_SUCCESS != code) { - QW_UNLOCK(QW_READ, &pJob->lock); QW_TASK_ELOG("fail to put session into query session hash, code: 0x%x", code); QW_ERR_JRET(code); } - QW_UNLOCK(QW_READ, &pJob->lock); break; } while (true); @@ -192,6 +191,10 @@ int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession) { _return: if (NULL != pJob) { + if (TSDB_CODE_SUCCESS != code) { + qwDestroySession(QW_FPARAMS(), pJob, *ppSession); + } + taosHashRelease(gQueryMgmt.pJobInfo, pJob); } diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 41e36aacb8..0c41649e89 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -273,10 +273,6 @@ int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { return qwAddTask void qwReleaseTaskCtx(SQWorker *mgmt, void *ctx) { taosHashRelease(mgmt->ctxHash, ctx); } void qwFreeTaskHandle(SQWTaskCtx *ctx) { - if (ctx->dynamicTask) { - return; - } - // Note: free/kill may in RC qTaskInfo_t otaskHandle = atomic_load_ptr(&ctx->taskHandle); if (otaskHandle && otaskHandle == atomic_val_compare_exchange_ptr(&ctx->taskHandle, otaskHandle, NULL)) { @@ -289,10 +285,6 @@ void qwFreeTaskHandle(SQWTaskCtx *ctx) { } void qwFreeSinkHandle(SQWTaskCtx *ctx) { - if (ctx->dynamicTask) { - return; - } - // Note: free/kill may in RC void* osinkHandle = atomic_load_ptr(&ctx->sinkHandle); if (osinkHandle && osinkHandle == atomic_val_compare_exchange_ptr(&ctx->sinkHandle, osinkHandle, NULL)) { @@ -590,6 +582,7 @@ void qwCloseRef(void) { gQwMgmt.qwRef = -1; taosHashCleanup(gQueryMgmt.pJobInfo); + gQueryMgmt.pJobInfo = NULL; } taosWUnLockLatch(&gQwMgmt.lock); } @@ -734,11 +727,13 @@ void qwClearExpiredSch(SQWorker *mgmt, SArray *pExpiredSch) { } } -void qwDestroyJobInfo(SQWJobInfo* pJob) { - if (NULL == pJob) { +void qwDestroyJobInfo(void* job) { + if (NULL == job) { return; } + SQWJobInfo* pJob = (SQWJobInfo*)job; + taosMemoryFreeClear(pJob->memInfo); taosHashCleanup(pJob->pSessions); pJob->pSessions = NULL; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 2e5f6c766f..0168d4b9eb 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -89,7 +89,7 @@ int32_t qwHandleTaskComplete(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { _return: - if (!ctx->explain || ctx->explainRsped) { + if ((!ctx->dynamicTask) && (!ctx->explain || ctx->explainRsped)) { qwFreeTaskHandle(ctx); } @@ -490,7 +490,9 @@ int32_t qwQuickRspFetchReq(QW_FPARAMS_DEF, SQWTaskCtx *ctx, SQWMsg *qwMsg, int32 qwBuildFetchRsp(rsp, &sOutput, dataLen, rawLen, qComplete); if (qComplete) { atomic_store_8((int8_t *)&ctx->queryEnd, true); - qwFreeSinkHandle(ctx); + if (!ctx->dynamicTask) { + qwFreeSinkHandle(ctx); + } } } @@ -876,7 +878,9 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { if (qComplete) { atomic_store_8((int8_t *)&ctx->queryEnd, true); atomic_store_8((int8_t *)&ctx->queryContinue, 0); - qwFreeSinkHandle(ctx); + if (!ctx->dynamicTask) { + qwFreeSinkHandle(ctx); + } } qwMsg->connInfo = ctx->dataConnInfo; @@ -966,7 +970,9 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { qwBuildFetchRsp(rsp, &sOutput, dataLen, rawDataLen, qComplete); if (qComplete) { atomic_store_8((int8_t *)&ctx->queryEnd, true); - qwFreeSinkHandle(ctx); + if (!ctx->dynamicTask) { + qwFreeSinkHandle(ctx); + } } } @@ -1593,7 +1599,9 @@ int32_t qWorkerProcessLocalFetch(void *pMgmt, uint64_t sId, uint64_t qId, uint64 qwBuildFetchRsp(rsp, &sOutput, dataLen, rawLen, qComplete); if (qComplete) { atomic_store_8((int8_t *)&ctx->queryEnd, true); - qwFreeSinkHandle(ctx); + if (!ctx->dynamicTask) { + qwFreeSinkHandle(ctx); + } } break; diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 2aeffc6395..c9d85dfcb5 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -1278,6 +1278,7 @@ static void cliHandleException(SCliConn* conn) { if (conn->registered) { int8_t ref = transGetRefCount(conn); if (ref == 0 && !uv_is_closing((uv_handle_t*)conn->stream)) { + tTrace("%s conn %p fd %d,%d,%d,%p uv_closed", CONN_GET_INST_LABEL(conn), conn, conn->stream->u.fd, conn->stream->io_watcher.fd, conn->stream->accepted_fd, conn->stream->queued_fds); uv_close((uv_handle_t*)conn->stream, cliDestroy); } } @@ -1586,6 +1587,8 @@ static int32_t cliDoConn(SCliThrd* pThrd, SCliConn* conn) { TAOS_CHECK_GOTO(terrno, &lino, _exception1); } + tTrace("%s conn %p fd %d openend", pInst->label, conn, fd); + int ret = uv_tcp_open((uv_tcp_t*)conn->stream, fd); if (ret != 0) { tError("%s conn %p failed to set stream since %s", transLabel(pInst), conn, uv_err_name(ret)); diff --git a/source/util/inc/tmempoolInt.h b/source/util/inc/tmempoolInt.h index 9013ad03a2..7ed6be042d 100755 --- a/source/util/inc/tmempoolInt.h +++ b/source/util/inc/tmempoolInt.h @@ -75,8 +75,10 @@ extern "C" { #define MP_STAT_PROC_FLAG_RES_FAIL (1 << 3) // CTRL FUNC FLAGS -#define MP_CTRL_FLAG_PRINT_STAT (1 << 0) -#define MP_CTRL_FLAG_CHECK_STAT (1 << 1) +#define MP_CTRL_FLAG_PRINT_STAT (1 << 0) +#define MP_CTRL_FLAG_CHECK_STAT (1 << 1) +#define MP_CTRL_FLAG_LOCK_DBG (1 << 2) +#define MP_CTRL_FLAG_LOG_MAXSIZE (1 << 3) typedef enum EMPStatLogItem { @@ -199,7 +201,6 @@ typedef struct SMPStatInfo { typedef struct SMPJob { SMemPoolJob job; // KEEP IT FIRST - int32_t remainSession; SMPStatInfo stat; } SMPJob; @@ -281,7 +282,6 @@ typedef struct SMemPool { SMemPoolCfg cfg; //int64_t retireThreshold[3]; int64_t retireUnit; - SMPCtrlInfo ctrl; int64_t maxAllocMemSize; int64_t allocMemSize; @@ -306,6 +306,7 @@ typedef struct SMPMsgQueue { typedef struct SMemPoolMgmt { EMPMemStrategy strategy; + SMPCtrlInfo ctrl; SArray* poolList; SRWLatch poolLock; TdThread poolMgmtThread; @@ -316,6 +317,9 @@ typedef struct SMemPoolMgmt { int32_t code; } SMemPoolMgmt; +extern SMemPoolMgmt gMPMgmt; + + typedef int32_t (*mpAllocFunc)(SMemPool*, SMPSession*, int64_t*, uint32_t, void**); typedef void (*mpFreeFunc)(SMemPool*, SMPSession*, void *, int64_t*); typedef int64_t (*mpGetSizeFunc)(SMemPool*, SMPSession*, void*); @@ -376,17 +380,25 @@ enum { #define MP_TRY_LOCK(type, _lock, _res) \ do { \ if (MP_READ == (type)) { \ - ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before try read lock"); \ - uDebug("MP TRY RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \ + ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before try read lock"); \ + uDebug("MP TRY RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + } \ (_res) = taosRTryLockLatch(_lock); \ - uDebug("MP TRY RLOCK%p:%d %s, %s:%d E", (_lock), atomic_load_32(_lock), (_res) ? "failed" : "succeed", __FILE__, __LINE__); \ - ASSERTS((_res) ? atomic_load_32((_lock)) >= 0 : atomic_load_32((_lock)) > 0, "invalid lock value after try read lock"); \ + if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \ + uDebug("MP TRY RLOCK%p:%d %s, %s:%d E", (_lock), atomic_load_32(_lock), (_res) ? "failed" : "succeed", __FILE__, __LINE__); \ + ASSERTS((_res) ? atomic_load_32((_lock)) >= 0 : atomic_load_32((_lock)) > 0, "invalid lock value after try read lock"); \ + } \ } else { \ - ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before try write lock"); \ - uDebug("MP TRY WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \ + ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before try write lock"); \ + uDebug("MP TRY WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + } \ (_res) = taosWTryLockLatch(_lock); \ - uDebug("MP TRY WLOCK%p:%d %s, %s:%d E", (_lock), atomic_load_32(_lock), (_res) ? "failed" : "succeed", __FILE__, __LINE__); \ - ASSERTS((_res) ? atomic_load_32((_lock)) >= 0 : atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after try write lock"); \ + if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \ + uDebug("MP TRY WLOCK%p:%d %s, %s:%d E", (_lock), atomic_load_32(_lock), (_res) ? "failed" : "succeed", __FILE__, __LINE__); \ + ASSERTS((_res) ? atomic_load_32((_lock)) >= 0 : atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after try write lock"); \ + } \ } \ } while (0) @@ -394,34 +406,50 @@ enum { #define MP_LOCK(type, _lock) \ do { \ if (MP_READ == (type)) { \ - ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before read lock"); \ - uDebug("MP RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \ + ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before read lock"); \ + uDebug("MP RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + } \ taosRLockLatch(_lock); \ - uDebug("MP RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value after read lock"); \ + if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \ + uDebug("MP RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value after read lock"); \ + } \ } else { \ - ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before write lock"); \ - uDebug("MP WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \ + ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value before write lock"); \ + uDebug("MP WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + } \ taosWLockLatch(_lock); \ - uDebug("MP WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - ASSERTS(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after write lock"); \ + if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \ + uDebug("MP WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + ASSERTS(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value after write lock"); \ + } \ } \ } while (0) #define MP_UNLOCK(type, _lock) \ do { \ if (MP_READ == (type)) { \ - ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value before read unlock"); \ - uDebug("MP RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \ + ASSERTS(atomic_load_32((_lock)) > 0, "invalid lock value before read unlock"); \ + uDebug("MP RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + } \ taosRUnLockLatch(_lock); \ - uDebug("MP RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after read unlock"); \ + if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \ + uDebug("MP RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after read unlock"); \ + } \ } else { \ - ASSERTS(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value before write unlock"); \ - uDebug("MP WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \ + ASSERTS(atomic_load_32((_lock)) == TD_RWLATCH_WRITE_FLAG_COPY, "invalid lock value before write unlock"); \ + uDebug("MP WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + } \ taosWUnLockLatch(_lock); \ - uDebug("MP WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after write unlock"); \ + if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOCK_DBG)) { \ + uDebug("MP WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + ASSERTS(atomic_load_32((_lock)) >= 0, "invalid lock value after write unlock"); \ + } \ } \ } while (0) diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index e530ee017a..26320c49a3 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -233,9 +233,6 @@ int32_t mpInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) { MP_ERR_RET(mpUpdateCfg(pPool)); - pPool->ctrl.statFlags = MP_STAT_FLAG_LOG_ALL & (~MP_LOG_FLAG_ALL_POS); - pPool->ctrl.funcFlags = MP_CTRL_FLAG_PRINT_STAT | MP_CTRL_FLAG_CHECK_STAT; - pPool->sessionCache.groupNum = MP_SESSION_CACHE_ALLOC_BATCH_SIZE; pPool->sessionCache.nodeSize = sizeof(SMPSession); @@ -282,7 +279,9 @@ void mpUpdateAllocSize(SMemPool* pPool, SMPSession* pSession, int64_t size, int6 } int64_t allocMemSize = atomic_load_64(&pPool->allocMemSize); - mpUpdateMaxAllocSize(&pPool->maxAllocMemSize, allocMemSize); + if (MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_LOG_MAXSIZE)) { + mpUpdateMaxAllocSize(&pPool->maxAllocMemSize, allocMemSize); + } } int32_t mpPutRetireMsgToQueue(SMemPool* pPool, bool retireLowLevel) { @@ -914,7 +913,7 @@ void mpLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, SMPSt if (pSession && MP_GET_FLAG(pSession->ctrl.statFlags, MP_LOG_FLAG_ALL_MEM)) { mpLogDetailStat(&pSession->stat.statDetail, item, pInput); } - if (MP_GET_FLAG(pPool->ctrl.statFlags, MP_LOG_FLAG_ALL_MEM)) { + if (MP_GET_FLAG(gMPMgmt.ctrl.statFlags, MP_LOG_FLAG_ALL_MEM)) { mpLogDetailStat(&pPool->stat.statDetail, item, pInput); } if (pSession && MP_GET_FLAG(pSession->ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) { @@ -922,7 +921,7 @@ void mpLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, SMPSt mpLogPosStat(&pSession->stat.posStat, item, pInput, true); taosEnableMemPoolUsage(); } - if (MP_GET_FLAG(pPool->ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) { + if (MP_GET_FLAG(gMPMgmt.ctrl.statFlags, MP_LOG_FLAG_ALL_POS)) { taosDisableMemPoolUsage(); mpLogPosStat(&pPool->stat.posStat, item, pInput, false); taosEnableMemPoolUsage(); @@ -967,7 +966,7 @@ void mpCheckStatDetail(void* poolHandle, void* session, char* detailName) { } if (NULL != poolHandle) { - pCtrl = &pPool->ctrl; + pCtrl = &gMPMgmt.ctrl; pDetail = &pPool->stat.statDetail; int64_t sessInit = atomic_load_64(&pPool->stat.statSession.initFail) + atomic_load_64(&pPool->stat.statSession.initSucc); if (MP_GET_FLAG(pCtrl->funcFlags, MP_CTRL_FLAG_CHECK_STAT) && sessInit == atomic_load_64(&pPool->stat.statSession.destroyNum)) { @@ -1084,6 +1083,9 @@ void mpModInit(void) { gMPMgmt.strategy = E_MP_STRATEGY_DIRECT; + gMPMgmt.ctrl.statFlags = MP_STAT_FLAG_LOG_ALL & (~MP_LOG_FLAG_ALL_POS); + gMPMgmt.ctrl.funcFlags = MP_CTRL_FLAG_PRINT_STAT | MP_CTRL_FLAG_CHECK_STAT | MP_CTRL_FLAG_LOG_MAXSIZE; + //gMPMgmt.code = tsem2_init(&gMPMgmt.threadSem, 0, 0); //if (TSDB_CODE_SUCCESS != gMPMgmt.code) { // uError("failed to init sem2, error: 0x%x", gMPMgmt.code); @@ -1122,16 +1124,16 @@ void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName) { if (NULL != pPool) { snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, pPool->name); detailName[sizeof(detailName) - 1] = 0; - mpPrintSessionStat(&pPool->ctrl, &pPool->stat.statSession, detailName); - mpPrintStatDetail(&pPool->ctrl, &pPool->stat.statDetail, detailName, pPool->maxAllocMemSize); + mpPrintSessionStat(&gMPMgmt.ctrl, &pPool->stat.statSession, detailName); + mpPrintStatDetail(&gMPMgmt.ctrl, &pPool->stat.statDetail, detailName, pPool->maxAllocMemSize); snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolNode"); detailName[sizeof(detailName) - 1] = 0; - mpPrintNodeStat(&pPool->ctrl, pPool->stat.nodeStat, detailName); + mpPrintNodeStat(&gMPMgmt.ctrl, pPool->stat.nodeStat, detailName); snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolPos"); detailName[sizeof(detailName) - 1] = 0; - mpPrintPosStat(&pPool->ctrl, &pPool->stat.posStat, detailName); + mpPrintPosStat(&gMPMgmt.ctrl, &pPool->stat.posStat, detailName); } } @@ -1187,17 +1189,13 @@ void taosMemPoolCfgUpdate(void* poolHandle, SMemPoolCfg* pCfg) { (void)mpUpdateCfg(pPool); } -void taosMemPoolDestroySession(void* poolHandle, void* session, int32_t* remainSessions) { +void taosMemPoolDestroySession(void* poolHandle, void* session) { SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; if (NULL == poolHandle || NULL == pSession) { uWarn("null pointer of poolHandle %p or session %p", poolHandle, session); return; } - - if (remainSessions) { - *remainSessions = atomic_sub_fetch_32(&pSession->pJob->remainSession, 1); - } //TODO; @@ -1225,7 +1223,7 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob, c MP_ERR_JRET(terrno); } - TAOS_MEMCPY(&pSession->ctrl, &pPool->ctrl, sizeof(pSession->ctrl)); + TAOS_MEMCPY(&pSession->ctrl, &gMPMgmt.ctrl, sizeof(pSession->ctrl)); if (gMPFps[gMPMgmt.strategy].initSessionFp) { MP_ERR_JRET((*gMPFps[gMPMgmt.strategy].initSessionFp)(pPool, pSession)); @@ -1234,12 +1232,11 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob, c MP_ERR_JRET(mpInitPosStat(&pSession->stat.posStat, true)); pSession->pJob = (SMPJob*)pJob; - (void)atomic_add_fetch_32(&pSession->pJob->remainSession, 1); _return: if (TSDB_CODE_SUCCESS != code) { - taosMemPoolDestroySession(poolHandle, pSession, NULL); + taosMemPoolDestroySession(poolHandle, pSession); pSession = NULL; (void)atomic_add_fetch_64(&pPool->stat.statSession.initFail, 1); } else { diff --git a/source/util/test/memPoolTest.cpp b/source/util/test/memPoolTest.cpp index 98393388bc..0181f2ea95 100644 --- a/source/util/test/memPoolTest.cpp +++ b/source/util/test/memPoolTest.cpp @@ -42,7 +42,8 @@ namespace { -#define MPT_PRINTF(param, ...) (void)printf("[%" PRId64 ",%" PRId64 "] " param, mptCtx.caseLoop, mptCtx.jobLoop, __VA_ARGS__) +#define MPT_PRINTF(param, ...) (void)printf("[%" PRId64 ",%" PRId64 "] " param, mptCaseLoop, mptExecLoop, __VA_ARGS__) +#define MPT_EPRINTF(param, ...) (void)printf(param, __VA_ARGS__) #define MPT_MAX_MEM_ACT_TIMES 300 #define MPT_MAX_SESSION_NUM 100 @@ -54,6 +55,7 @@ namespace { #define MPT_MIN_RESERVE_MEM_SIZE (512 * 1048576UL) #define MPT_MIN_MEM_POOL_SIZE (1048576UL) #define MPT_MAX_RETIRE_JOB_NUM 10000 +#define MPT_DEFAULT_TASK_RUN_TIMES 10 enum { MPT_READ = 1, @@ -63,100 +65,16 @@ enum { #define TD_RWLATCH_WRITE_FLAG_COPY 0x40000000 -static int32_t MPT_TRY_LOCK(int32_t type, SRWLatch *_lock) { - int32_t code = -1; - - if (MPT_READ == (type)) { - if (atomic_load_32((_lock)) < 0) { - uError("invalid lock value before try read lock"); - return -1; - } - uDebug("MPT TRY RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); - code = taosRTryLockLatch(_lock); - uDebug("MPT TRY RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); - if (atomic_load_32((_lock)) <= 0) { - uError("invalid lock value after try read lock"); - return -1; - } - } else { - if (atomic_load_32((_lock)) < 0) { - uError("invalid lock value before try write lock"); - return -1; - } - uDebug("MPT TRY WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); - code = taosWTryLockLatch(_lock); - uDebug("MPT TRY WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); - if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { - uError("invalid lock value after try write lock"); - return -1; - } - } - - return code; -} - -#define MPT_LOCK(type, _lock) \ - do { \ - if (MPT_READ == (type)) { \ - if (atomic_load_32((_lock)) < 0) { \ - uError("invalid lock value before read lock"); \ - break; \ - } \ - uDebug("MPT RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - taosRLockLatch(_lock); \ - uDebug("MPT RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - if (atomic_load_32((_lock)) <= 0) { \ - uError("invalid lock value after read lock"); \ - break; \ - } \ - } else { \ - if (atomic_load_32((_lock)) < 0) { \ - uError("invalid lock value before write lock"); \ - break; \ - } \ - uDebug("MPT WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - taosWLockLatch(_lock); \ - uDebug("MPT WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { \ - uError("invalid lock value after write lock"); \ - break; \ - } \ - } \ - } while (0) - -#define MPT_UNLOCK(type, _lock) \ - do { \ - if (MPT_READ == (type)) { \ - if (atomic_load_32((_lock)) <= 0) { \ - uError("invalid lock value before read unlock"); \ - break; \ - } \ - uDebug("MPT RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - taosRUnLockLatch(_lock); \ - uDebug("MPT RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - if (atomic_load_32((_lock)) < 0) { \ - uError("invalid lock value after read unlock"); \ - break; \ - } \ - } else { \ - if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { \ - uError("invalid lock value before write unlock"); \ - break; \ - } \ - uDebug("MPT WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - taosWUnLockLatch(_lock); \ - uDebug("MPT WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ - if (atomic_load_32((_lock)) < 0) { \ - uError("invalid lock value after write unlock"); \ - break; \ - } \ - } \ - } while (0) - - threadlocal void* mptThreadPoolHandle = NULL; threadlocal void* mptThreadPoolSession = NULL; +threadlocal int32_t mptJobNum = 0; +threadlocal int32_t mptExecNum = 0; +threadlocal int32_t mptExecLoop = 0; +threadlocal int64_t mptCaseLoop = 0; + + + #define MPT_SET_TEID(id, tId, eId) \ @@ -209,6 +127,7 @@ typedef struct SMPTJobInfo { SRWLatch lock; int8_t destroyed; SHashObj* pSessions; + int8_t initDone; } SMPTJobInfo; @@ -219,9 +138,10 @@ typedef struct { int32_t jobNum; int32_t jobTaskNum; int64_t maxSingleAllocSize; - char* pSrcString; bool printExecDetail; bool printInputRow; + + bool lockDbg; } SMPTestCtrl; typedef struct { @@ -256,6 +176,7 @@ typedef struct { int32_t jobIdx; int64_t jobId; + int32_t initTimes; void* pSessions[MPT_MAX_SESSION_NUM]; int32_t taskNum; SMPTestTaskCtx taskCtxs[MPT_MAX_SESSION_NUM]; @@ -274,6 +195,12 @@ typedef struct { int32_t randTask; } SMPTestParam; +typedef struct { + int64_t initNum; + int64_t retireNum; + int64_t destoryNum; +} SMPTestJobStat; + typedef struct { int32_t idx; TdThread threadFp; @@ -289,17 +216,139 @@ typedef struct SMPTestCtx { SMPTestThread threadCtxs[MPT_MAX_THREAD_NUM]; TdThread dropThreadFp; int32_t jobNum; + int64_t totalTaskNum; SMPTestJobCtx* jobCtxs; SMPTestParam param; - + SMPTestJobStat runStat; + + SRWLatch stringLock; + char* pSrcString; + + bool initDone; int8_t testDone; - int64_t caseLoop; int64_t jobLoop; } SMPTestCtx; SMPTestCtx mptCtx = {0}; SMPTestCtrl mptCtrl = {0}; +static int32_t MPT_TRY_LOCK(int32_t type, SRWLatch *_lock) { + int32_t code = -1; + + if (MPT_READ == (type)) { + if (mptCtrl.lockDbg) { + if (atomic_load_32((_lock)) < 0) { + uError("invalid lock value before try read lock"); + return -1; + } + uDebug("MPT TRY RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); + } + code = taosRTryLockLatch(_lock); + if (mptCtrl.lockDbg) { + uDebug("MPT TRY RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); + if (atomic_load_32((_lock)) <= 0) { + uError("invalid lock value after try read lock"); + return -1; + } + } + } else { + if (mptCtrl.lockDbg) { + if (atomic_load_32((_lock)) < 0) { + uError("invalid lock value before try write lock"); + return -1; + } + uDebug("MPT TRY WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); + } + code = taosWTryLockLatch(_lock); + if (mptCtrl.lockDbg) { + uDebug("MPT TRY WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); + if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { + uError("invalid lock value after try write lock"); + return -1; + } + } + } + + return code; +} + +#define MPT_LOCK(type, _lock) \ + do { \ + if (MPT_READ == (type)) { \ + if (mptCtrl.lockDbg) { \ + if (atomic_load_32((_lock)) < 0) { \ + uError("invalid lock value before read lock"); \ + break; \ + } \ + uDebug("MPT RLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + } \ + taosRLockLatch(_lock); \ + if (mptCtrl.lockDbg) { \ + uDebug("MPT RLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + if (atomic_load_32((_lock)) <= 0) { \ + uError("invalid lock value after read lock"); \ + break; \ + } \ + } \ + } else { \ + if (mptCtrl.lockDbg) { \ + if (atomic_load_32((_lock)) < 0) { \ + uError("invalid lock value before write lock"); \ + break; \ + } \ + uDebug("MPT WLOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + } \ + taosWLockLatch(_lock); \ + if (mptCtrl.lockDbg) { \ + uDebug("MPT WLOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { \ + uError("invalid lock value after write lock"); \ + break; \ + } \ + } \ + } \ + } while (0) + +#define MPT_UNLOCK(type, _lock) \ + do { \ + if (MPT_READ == (type)) { \ + if (mptCtrl.lockDbg) { \ + if (atomic_load_32((_lock)) <= 0) { \ + uError("invalid lock value before read unlock"); \ + break; \ + } \ + uDebug("MPT RULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + } \ + taosRUnLockLatch(_lock); \ + if (mptCtrl.lockDbg) { \ + uDebug("MPT RULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + if (atomic_load_32((_lock)) < 0) { \ + uError("invalid lock value after read unlock"); \ + break; \ + } \ + } \ + } else { \ + if (mptCtrl.lockDbg) { \ + if (atomic_load_32((_lock)) != TD_RWLATCH_WRITE_FLAG_COPY) { \ + uError("invalid lock value before write unlock"); \ + break; \ + } \ + uDebug("MPT WULOCK%p:%d, %s:%d B", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + } \ + taosWUnLockLatch(_lock); \ + if (mptCtrl.lockDbg) { \ + uDebug("MPT WULOCK%p:%d, %s:%d E", (_lock), atomic_load_32(_lock), __FILE__, __LINE__); \ + if (atomic_load_32((_lock)) < 0) { \ + uError("invalid lock value after write unlock"); \ + break; \ + } \ + } \ + } \ + } while (0) + + + + #if 0 void joinTestReplaceRetrieveFp() { static Stub stub; @@ -356,13 +405,23 @@ void mptDeleteJobQueueData(void* pData) { taosHashRelease(mptCtx.pJobs, pJob); } + +void mptDestroyJobInfo(void* job) { + SMPTJobInfo* pJob = (SMPTJobInfo*)job; + + taosMemFree(pJob->memInfo); + taosHashCleanup(pJob->pSessions); +} + + + void mptInit() { osDefaultInit(); mptInitLogFile(); mptCtrl.caseLoopTimes = 100000; mptCtrl.taskActTimes = 0; - mptCtrl.maxSingleAllocSize = 104857600; + mptCtrl.maxSingleAllocSize = 104857600 * 5; mptCtrl.jobNum = 100; mptCtrl.jobExecTimes = 10; mptCtrl.jobTaskNum = 0; @@ -370,51 +429,41 @@ void mptInit() { mptCtx.pJobs = taosHashInit(1024, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); ASSERT_TRUE(NULL != mptCtx.pJobs); + taosHashSetFreeFp(mptCtx.pJobs, mptDestroyJobInfo); + mptCtx.pJobQueue = createBoundedQueue(10000, mptJobMemSizeCompFn, mptDeleteJobQueueData, NULL); ASSERT_TRUE(NULL != mptCtx.pJobQueue); mptCtx.jobCtxs = (SMPTestJobCtx*)taosMemoryCalloc(MPT_MAX_JOB_NUM, sizeof(*mptCtx.jobCtxs)); ASSERT_TRUE(NULL != mptCtx.jobCtxs); - mptCtrl.pSrcString = (char*)taosMemoryMalloc(mptCtrl.maxSingleAllocSize); - ASSERT_TRUE(NULL != mptCtrl.pSrcString); - memset(mptCtrl.pSrcString, 'P', mptCtrl.maxSingleAllocSize - 1); - mptCtrl.pSrcString[mptCtrl.maxSingleAllocSize - 1] = 0; + mptCtx.pSrcString = (char*)taosMemoryMalloc(mptCtrl.maxSingleAllocSize); + ASSERT_TRUE(NULL != mptCtx.pSrcString); + memset(mptCtx.pSrcString, 'P', mptCtrl.maxSingleAllocSize - 1); + mptCtx.pSrcString[mptCtrl.maxSingleAllocSize - 1] = 0; } - -void mptDestroyJobInfo(SMPTJobInfo* pJob) { - taosMemFree(pJob->memInfo); - taosHashCleanup(pJob->pSessions); -} - - - void mptDestroySession(uint64_t qId, int64_t tId, int32_t eId, int32_t taskIdx, SMPTestJobCtx* pJobCtx, void* session) { SMPTJobInfo *pJobInfo = pJobCtx->pJob; char id[sizeof(tId) + sizeof(eId) + 1] = {0}; MPT_SET_TEID(id, tId, eId); - int32_t remainSessions = 0; + int32_t remainSessions = atomic_sub_fetch_32(&pJobInfo->memInfo->remainSession, 1); (void)taosHashRemove(pJobInfo->pSessions, id, sizeof(id)); - taosMemPoolDestroySession(gMemPoolHandle, session, &remainSessions); + taosMemPoolDestroySession(gMemPoolHandle, session); if (0 == remainSessions) { - MPT_LOCK(MPT_WRITE, &pJobInfo->lock); if (0 == taosHashGetSize(pJobInfo->pSessions)) { atomic_store_8(&pJobInfo->destroyed, 1); uDebug("JOB:0x%x idx:%d destroyed, code:0x%x", pJobCtx->jobId, pJobCtx->jobIdx, pJobInfo->errCode); - mptDestroyJobInfo(pJobInfo); - MPT_UNLOCK(MPT_WRITE, &pJobInfo->lock); - - pJobCtx->pJob = NULL; + atomic_add_fetch_64(&mptCtx.runStat.destoryNum, 1); (void)taosHashRemove(mptCtx.pJobs, &qId, sizeof(qId)); + + pJobCtx->pJob = NULL; uInfo("the whole query job removed"); - } else { - MPT_UNLOCK(MPT_WRITE, &pJobInfo->lock); } } } @@ -500,7 +549,7 @@ int32_t mptInitSession(uint64_t qId, uint64_t tId, int32_t eId, SMPTestJobCtx* p break; } - pJobCtx->pJob = pJob; + atomic_store_ptr(&pJobCtx->pJob, pJob); pJob->pCtx = pJobCtx; char id[sizeof(tId) + sizeof(eId) + 1] = {0}; @@ -508,8 +557,12 @@ int32_t mptInitSession(uint64_t qId, uint64_t tId, int32_t eId, SMPTestJobCtx* p assert(0 == taosMemPoolInitSession(gMemPoolHandle, ppSession, pJob->memInfo, id)); + atomic_add_fetch_32(&pJob->memInfo->remainSession, 1); + assert(0 == taosHashPut(pJob->pSessions, id, sizeof(id), ppSession, POINTER_BYTES)); + atomic_store_8(&pJob->initDone, 1); + _return: if (NULL != pJob) { @@ -543,12 +596,20 @@ void mptInitJob(int32_t idx) { pJobCtx->jobIdx = idx; pJobCtx->jobId = atomic_add_fetch_64(&mptCtx.qId, 1); pJobCtx->taskNum = (mptCtrl.jobTaskNum) ? mptCtrl.jobTaskNum : ((taosRand() % 10) ? (taosRand() % (MPT_MAX_SESSION_NUM/10)) : (taosRand() % MPT_MAX_SESSION_NUM)) + 1; + pJobCtx->initTimes++; + + if (!mptCtx.initDone) { + atomic_add_fetch_64(&mptCtx.totalTaskNum, pJobCtx->taskNum); + } + for (int32_t i = 0; i < pJobCtx->taskNum; ++i) { mptInitTask(i, 0, pJobCtx); assert(pJobCtx->pJob); } - uDebug("JOB:0x%x idx:%d initialized, taskNum:%d", pJobCtx->jobId, idx, pJobCtx->taskNum); + atomic_add_fetch_64(&mptCtx.runStat.initNum, 1); + + uDebug("JOB:0x%x idx:%d initialized, total times:%d, taskNum:%d", pJobCtx->jobId, idx, pJobCtx->initTimes, pJobCtx->taskNum); } @@ -565,18 +626,13 @@ void mptDestroyTask(SMPTestJobCtx* pJobCtx, int32_t taskIdx) { } int32_t mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) { - if (MPT_TRY_LOCK(MPT_WRITE, &pJobCtx->jobExecLock)) { - return -1; - } - uint64_t jobId = pJobCtx->jobId; for (int32_t i = 0; i < pJobCtx->taskNum; ++i) { if (!pJobCtx->taskCtxs[i].destoryed) { mptDestroyTask(pJobCtx, i); } } - - + //mptDestroyJobInfo(pJobCtx->pJob); //(void)taosHashRemove(mptCtx.pJobs, &pJobCtx->jobId, sizeof(pJobCtx->jobId)); @@ -586,8 +642,6 @@ int32_t mptDestroyJob(SMPTestJobCtx* pJobCtx, bool reset) { mptInitJob(jobIdx); } - MPT_UNLOCK(MPT_WRITE, &pJobCtx->jobExecLock); - MPT_PRINTF(" JOB:0x%x retired\n", jobId); return 0; @@ -598,32 +652,50 @@ void mptCheckCompareJobInfo(SMPTestJobCtx* pJobCtx) { } int32_t mptResetJob(SMPTestJobCtx* pJobCtx) { - if (NULL == pJobCtx->pJob) { + if (MPT_TRY_LOCK(MPT_WRITE, &pJobCtx->jobExecLock)) { return -1; } - + + if (NULL == atomic_load_ptr(&pJobCtx->pJob)) { + int32_t jobIdx = pJobCtx->jobIdx; + memset((char*)pJobCtx + sizeof(pJobCtx->jobExecLock), 0, sizeof(SMPTestJobCtx) - sizeof(pJobCtx->jobExecLock)); + mptInitJob(jobIdx); + + MPT_UNLOCK(MPT_WRITE, &pJobCtx->jobExecLock); + return 0; + } + + int32_t code = 0; if (atomic_load_8(&pJobCtx->pJob->retired)) { int32_t taskRunning = atomic_load_32(&pJobCtx->taskRunningNum); if (0 == taskRunning) { - return mptDestroyJob(pJobCtx, true); + code = mptDestroyJob(pJobCtx, true); } else { uDebug("JOB:0x%x retired but will not destroy cause of task running, num:%d", pJobCtx->jobId, taskRunning); - return -1; + code = -1; } } + MPT_UNLOCK(MPT_WRITE, &pJobCtx->jobExecLock); + return 0; } void mptRetireJob(SMPTJobInfo* pJob) { SMPTestJobCtx* pCtx = (SMPTestJobCtx*)pJob->pCtx; + if (MPT_TRY_LOCK(MPT_WRITE, &pCtx->jobExecLock)) { + return; + } + int32_t taskRunning = atomic_load_32(&pCtx->taskRunningNum); if (0 == taskRunning) { mptDestroyJob(pCtx, false); } else { uDebug("JOB:0x%x retired but will not destroy cause of task running, num:%d", pCtx->jobId, taskRunning); } + + MPT_UNLOCK(MPT_WRITE, &pCtx->jobExecLock); } int32_t mptGetMemPoolMaxMemSize(void* pHandle, int64_t* maxSize) { @@ -662,7 +734,7 @@ void mptRetireJobsCb(int64_t retireSize, int32_t errCode) { uint64_t jobId = 0; int64_t retiredSize = 0; while (retiredSize < retireSize && NULL != pJob) { - if (atomic_load_8(&pJob->retired)) { + if (atomic_load_8(&pJob->retired) || 0 == atomic_load_8(&pJob->initDone)) { pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, NULL); continue; } @@ -671,33 +743,35 @@ void mptRetireJobsCb(int64_t retireSize, int32_t errCode) { int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize); jobId = pJob->memInfo->jobId; + atomic_add_fetch_64(&mptCtx.runStat.retireNum, 1); + mptRetireJob(pJob); retiredSize += aSize; - uDebug("QID:0x%" PRIx64 " job retired cause of mid level memory retire, usedSize:%" PRId64 ", retireSize:%" PRId64 ", retiredSize:%" PRId64, + uDebug("QID:0x%" PRIx64 " job retired cause of limit reached, usedSize:%" PRId64 ", retireSize:%" PRId64 ", retiredSize:%" PRId64, jobId, aSize, retireSize, retiredSize); } pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, NULL); } + + taosHashCancelIterate(mptCtx.pJobs, pJob); } void mptRetireJobCb(uint64_t jobId, uint64_t clientId, int32_t errCode) { - char id[sizeof(jobId) + sizeof(clientId) + 1] = {0}; - MPT_SET_QCID(id, jobId, clientId); - - SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashGet(mptCtx.pJobs, id, sizeof(id)); + SMPTJobInfo* pJob = (SMPTJobInfo*)taosHashGet(mptCtx.pJobs, &jobId, sizeof(jobId)); if (NULL == pJob) { - uError("QID:0x%" PRIx64 " CID:0x%" PRIx64 " fail to get job from job hash", jobId, clientId); + uError("QID:0x%" PRIx64 " fail to get job from job hash", jobId); return; } if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { - uInfo("QID:0x%" PRIx64 " CID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, jobId, clientId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize)); + uInfo("QID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, jobId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize)); + atomic_add_fetch_64(&mptCtx.runStat.retireNum, 1); } else { - uDebug("QID:0x%" PRIx64 " CID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x, allocSize:%" PRId64, jobId, clientId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode), atomic_load_64(&pJob->memInfo->allocMemSize)); + uDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x, allocSize:%" PRId64, jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode), atomic_load_64(&pJob->memInfo->allocMemSize)); } } @@ -722,7 +796,7 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { while (!actDone) { actId = taosRand() % 10; - size = (taosRand() % 10) ? (taosRand() % (mptCtrl.maxSingleAllocSize / 100)) : (taosRand() % mptCtrl.maxSingleAllocSize); + size = (taosRand() % 8) ? (taosRand() % (mptCtrl.maxSingleAllocSize / 100)) : (taosRand() % mptCtrl.maxSingleAllocSize); switch (actId) { case 0: { // malloc @@ -736,7 +810,7 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { pTask->stat.bytes.memMalloc.exec+=size; pTask->stat.times.memMalloc.fail++; pTask->stat.bytes.memMalloc.fail+=size; - uError("JOB:0x%x TASK:0x%x mpMalloc %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno); + uError("JOB:0x%x TASK:0x%x mpMalloc %d failed, error:%s", pJobCtx->jobId, pTask->taskId, size, tstrerror(terrno)); return; } @@ -764,7 +838,7 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { pTask->stat.bytes.memCalloc.exec+=size; pTask->stat.times.memCalloc.fail++; pTask->stat.bytes.memCalloc.fail+=size; - uError("JOB:0x%x TASK:0x%x mpCalloc %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno); + uError("JOB:0x%x TASK:0x%x mpCalloc %d failed, error:%s", pJobCtx->jobId, pTask->taskId, size, tstrerror(terrno)); return; } @@ -794,7 +868,7 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { pTask->stat.bytes.memRealloc.exec+=size; pTask->stat.times.memRealloc.fail++; pTask->stat.bytes.memRealloc.fail+=size; - uError("JOB:0x%x TASK:0x%x new mpRealloc %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno); + uError("JOB:0x%x TASK:0x%x new mpRealloc %d failed, error:%s", pJobCtx->jobId, pTask->taskId, size, tstrerror(terrno)); return; } @@ -833,7 +907,7 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { pTask->stat.bytes.memFree.exec+=osize; pTask->stat.times.memFree.succ++; pTask->stat.bytes.memFree.succ+=osize; - uError("JOB:0x%x TASK:0x%x real mpRealloc %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno); + uError("JOB:0x%x TASK:0x%x real mpRealloc %d failed, error:%s", pJobCtx->jobId, pTask->taskId, size, tstrerror(terrno)); pTask->memIdx--; return; } @@ -879,16 +953,18 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { } size /= 10; - mptCtrl.pSrcString[size] = 0; - pTask->pMemList[pTask->memIdx].p = mptStrdup(mptCtrl.pSrcString); - mptCtrl.pSrcString[size] = 'W'; + MPT_LOCK(MPT_WRITE, &mptCtx.stringLock); + mptCtx.pSrcString[size] = 0; + pTask->pMemList[pTask->memIdx].p = mptStrdup(mptCtx.pSrcString); + mptCtx.pSrcString[size] = 'W'; + MPT_UNLOCK(MPT_WRITE, &mptCtx.stringLock); if (NULL == pTask->pMemList[pTask->memIdx].p) { pTask->stat.times.memStrdup.exec++; pTask->stat.bytes.memStrdup.exec+=size + 1; pTask->stat.times.memStrdup.fail++; pTask->stat.bytes.memStrdup.fail+=size + 1; - uError("JOB:0x%x TASK:0x%x mpStrdup %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno); + uError("JOB:0x%x TASK:0x%x mpStrdup %d failed, error:%s", pJobCtx->jobId, pTask->taskId, size, tstrerror(terrno)); return; } @@ -912,15 +988,18 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { } size /= 10; - assert(strlen(mptCtrl.pSrcString) > size); - pTask->pMemList[pTask->memIdx].p = mptStrndup(mptCtrl.pSrcString, size); + + MPT_LOCK(MPT_WRITE, &mptCtx.stringLock); + assert(strlen(mptCtx.pSrcString) > size); + pTask->pMemList[pTask->memIdx].p = mptStrndup(mptCtx.pSrcString, size); + MPT_UNLOCK(MPT_WRITE, &mptCtx.stringLock); if (NULL == pTask->pMemList[pTask->memIdx].p) { pTask->stat.times.memStrndup.exec++; pTask->stat.bytes.memStrndup.exec+=size + 1; pTask->stat.times.memStrndup.fail++; pTask->stat.bytes.memStrndup.fail+=size + 1; - uError("JOB:0x%x TASK:0x%x mpStrndup %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno); + uError("JOB:0x%x TASK:0x%x mpStrndup %d failed, error:%s", pJobCtx->jobId, pTask->taskId, size, tstrerror(terrno)); return; } @@ -985,7 +1064,7 @@ void mptSimulateAction(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { pTask->stat.bytes.memMalloc.exec+=size; pTask->stat.times.memMalloc.fail++; pTask->stat.bytes.memMalloc.fail+=size; - uError("JOB:0x%x TASK:0x%x mpMallocAlign %d failed, terrno:0x%x", pJobCtx->jobId, pTask->taskId, size, terrno); + uError("JOB:0x%x TASK:0x%x mpMallocAlign %d failed, error:%s", pJobCtx->jobId, pTask->taskId, size, tstrerror(terrno)); return; } @@ -1030,7 +1109,7 @@ void mptSimulateOutTask(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { return; } - if (taosRand() % 10 > 0) { + if (taosRand() % 2) { return; } @@ -1071,9 +1150,9 @@ void mptTaskRun(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pCtx, int32_t idx, int32 mptSimulateTask(pJobCtx, pCtx, actTimes); mptDisableMemoryPoolUsage(); - if (!atomic_load_8(&pJobCtx->pJob->retired)) { +// if (!atomic_load_8(&pJobCtx->pJob->retired)) { mptSimulateOutTask(pJobCtx, pCtx); - } +// } taosWUnLockLatch(&pCtx->taskExecLock); @@ -1087,6 +1166,7 @@ void mptInitJobs() { int32_t jobNum = mptCtrl.jobNum ? mptCtrl.jobNum : MPT_MAX_JOB_NUM; memset(mptCtx.jobCtxs, 0, sizeof(*mptCtx.jobCtxs) * jobNum); + mptCtx.totalTaskNum = 0; for (int32_t i = 0; i < jobNum; ++i) { mptInitJob(i); @@ -1163,61 +1243,109 @@ void mptCheckPoolUsedSize(int32_t jobNum) { } } -void* mptRunThreadFunc(void* param) { - SMPTestThread* pThread = (SMPTestThread*)param; - int32_t jobExecTimes = (mptCtrl.jobExecTimes) ? mptCtrl.jobExecTimes : taosRand() % MPT_MAX_JOB_LOOP_TIMES + 1; +void mptLaunchSingleTask(SMPTestThread* pThread, SMPTestJobCtx* pJobCtx, int32_t taskIdx, int32_t actTimes) { + if (atomic_load_8(&pJobCtx->pJob->retired) || pJobCtx->taskCtxs[taskIdx].destoryed) { + return; + } + + MPT_PRINTF("Thread %d start to run %d:%d task\n", pThread->idx, taskIdx, pJobCtx->taskNum); + mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[taskIdx], taskIdx, actTimes); + MPT_PRINTF("Thread %d end %d:%d task\n", pThread->idx, taskIdx, pJobCtx->taskNum); + +} - for (int32_t n = 0; n < jobExecTimes; ++n) { - mptCtx.jobNum = (mptCtrl.jobNum) ? mptCtrl.jobNum : (taosRand() % MPT_MAX_JOB_NUM + 1); - mptCtx.jobLoop = n; +void mptRunRandTasks(SMPTestThread* pThread) { + int64_t runTaskTimes = mptCtx.totalTaskNum * MPT_DEFAULT_TASK_RUN_TIMES, taskExecIdx = 0; + int32_t jobNum = mptCtrl.jobNum ? mptCtrl.jobNum : MPT_MAX_JOB_NUM; + int32_t jobIdx = 0, taskIdx = 0, code = 0; + SMPTestJobCtx* pJobCtx = NULL; - MPT_PRINTF("Thread %d start the %d:%d job loops - jobNum:%d\n", pThread->idx, n, jobExecTimes, mptCtx.jobNum); + MPT_PRINTF("Thread %d start the %d:%d exection - runTaskTimes:%" PRId64 "\n", pThread->idx, mptExecLoop, mptExecNum, runTaskTimes); - for (int32_t i = 0; i < mptCtx.jobNum; ++i) { - SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i]; - MPT_PRINTF(" Thread %d start to run %d:%d job[%d:0x%" PRIx64 "]\n", pThread->idx, i, mptCtx.jobNum, pJobCtx->jobIdx, pJobCtx->jobId); + while (runTaskTimes > 0) { + int32_t actTimes = mptCtrl.taskActTimes ? mptCtrl.taskActTimes : ((taosRand() % 10) ? (taosRand() % (MPT_MAX_MEM_ACT_TIMES/10)) : (taosRand() % MPT_MAX_MEM_ACT_TIMES)); + jobIdx = taosRand() % jobNum; - if (mptResetJob(pJobCtx)) { - continue; - } + pJobCtx = &mptCtx.jobCtxs[jobIdx]; - if (MPT_TRY_LOCK(MPT_READ, &pJobCtx->jobExecLock)) { - continue; - } - - if (mptCtx.param.randTask) { - int32_t actTimes = mptCtrl.taskActTimes ? mptCtrl.taskActTimes : ((taosRand() % 10) ? (taosRand() % (MPT_MAX_MEM_ACT_TIMES/10)) : (taosRand() % MPT_MAX_MEM_ACT_TIMES)); - int32_t taskIdx = taosRand() % pJobCtx->taskNum; - mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[taskIdx], taskIdx, actTimes); - MPT_UNLOCK(MPT_READ, &pJobCtx->jobExecLock); - continue; - } - - for (int32_t m = 0; m < pJobCtx->taskNum; ++m) { - if (atomic_load_8(&pJobCtx->pJob->retired)) { - break; - } - - if (pJobCtx->taskCtxs[m].destoryed) { - continue; - } - - int32_t actTimes = mptCtrl.taskActTimes ? mptCtrl.taskActTimes : ((taosRand() % 10) ? (taosRand() % (MPT_MAX_MEM_ACT_TIMES/10)) : (taosRand() % MPT_MAX_MEM_ACT_TIMES)); - - MPT_PRINTF("Thread %d start to run %d:%d task\n", pThread->idx, m, pJobCtx->taskNum); - mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[m], m, actTimes); - } - - MPT_UNLOCK(MPT_READ, &pJobCtx->jobExecLock); - - MPT_PRINTF(" Thread %d end %dth JOB 0x%x exec, retired:%d\n", pThread->idx, pJobCtx->jobIdx, pJobCtx->jobId, pJobCtx->pJob->retired); - - mptResetJob(pJobCtx); + if (mptResetJob(pJobCtx)) { + continue; } - MPT_PRINTF("Thread %d finish the %dth job loops\n", pThread->idx, n); + if (MPT_TRY_LOCK(MPT_READ, &pJobCtx->jobExecLock)) { + continue; + } + + taskIdx = taosRand() % pJobCtx->taskNum; + + if (atomic_load_8(&pJobCtx->pJob->retired) || pJobCtx->taskCtxs[taskIdx].destoryed) { + MPT_UNLOCK(MPT_READ, &pJobCtx->jobExecLock); + continue; + } + + MPT_PRINTF("Thread %d start to run %d:%d task\n", pThread->idx, taskExecIdx, runTaskTimes); + mptTaskRun(pJobCtx, &pJobCtx->taskCtxs[taskIdx], taskIdx, actTimes); + MPT_PRINTF("Thread %d end %d:%d task\n", pThread->idx, taskExecIdx, runTaskTimes); + + MPT_UNLOCK(MPT_READ, &pJobCtx->jobExecLock); - mptCheckPoolUsedSize(mptCtx.jobNum); + runTaskTimes--; + taskExecIdx++; + } + +} + +void mptRunLoopJobs(SMPTestThread* pThread) { + mptJobNum = (mptCtrl.jobNum) ? mptCtrl.jobNum : (taosRand() % MPT_MAX_JOB_NUM + 1); + + MPT_PRINTF("Thread %d start the %d:%d exection - jobNum:%d\n", pThread->idx, mptExecLoop, mptExecNum, mptJobNum); + + for (int32_t i = 0; i < mptJobNum; ++i) { + SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[i]; + + if (mptResetJob(pJobCtx)) { + continue; + } + + if (MPT_TRY_LOCK(MPT_READ, &pJobCtx->jobExecLock)) { + continue; + } + + MPT_PRINTF(" Thread %d start to run %d:%d job[%d:0x%" PRIx64 "]\n", pThread->idx, i, mptJobNum, pJobCtx->jobIdx, pJobCtx->jobId); + + for (int32_t m = 0; m < pJobCtx->taskNum; ++m) { + if (atomic_load_8(&pJobCtx->pJob->retired)) { + break; + } + + int32_t actTimes = mptCtrl.taskActTimes ? mptCtrl.taskActTimes : ((taosRand() % 10) ? (taosRand() % (MPT_MAX_MEM_ACT_TIMES/10)) : (taosRand() % MPT_MAX_MEM_ACT_TIMES)); + mptLaunchSingleTask(pThread, pJobCtx, m, actTimes); + } + + MPT_UNLOCK(MPT_READ, &pJobCtx->jobExecLock); + + MPT_PRINTF(" Thread %d end %dth JOB 0x%x exec, retired:%d\n", pThread->idx, pJobCtx->jobIdx, pJobCtx->jobId, pJobCtx->pJob->retired); + } +} + +void* mptRunThreadFunc(void* param) { + SMPTestThread* pThread = (SMPTestThread*)param; + mptExecNum = (mptCtrl.jobExecTimes) ? mptCtrl.jobExecTimes : taosRand() % MPT_MAX_JOB_LOOP_TIMES + 1; + + for (int32_t n = 0; n < mptExecNum; ++n) { + mptExecLoop = n; + + if (mptCtx.param.randTask) { + mptRunRandTasks(pThread); + } else { + mptRunLoopJobs(pThread); + } + + MPT_PRINTF("Thread %d finish the %dth exection\n", pThread->idx, n); + + if (mptCtx.param.threadNum <= 1) { + mptCheckPoolUsedSize(mptJobNum); + } } return NULL; @@ -1226,15 +1354,19 @@ void* mptRunThreadFunc(void* param) { void* mptDropThreadFunc(void* param) { int32_t jobIdx = 0, taskIdx = 0, code = 0; uint64_t taskId = 0; + int32_t jobNum = mptCtrl.jobNum ? mptCtrl.jobNum : MPT_MAX_JOB_NUM; while (!atomic_load_8(&mptCtx.testDone)) { - taosMsleep(200); + taosMsleep(400); + + MPT_EPRINTF("%" PRId64 " - initJobs:%" PRId64 " retireJobs:%" PRId64 " destoryJobs:%" PRId64 " remainJobs:%" PRId64 "\n", taosGetTimestampMs(), + mptCtx.runStat.initNum, mptCtx.runStat.retireNum, mptCtx.runStat.destoryNum, mptCtx.runStat.initNum - mptCtx.runStat.destoryNum); if (taosMemPoolTryLockPool(gMemPoolHandle, true)) { continue; } - jobIdx = taosRand() % mptCtx.jobNum; + jobIdx = taosRand() % jobNum; SMPTestJobCtx* pJobCtx = &mptCtx.jobCtxs[jobIdx]; MPT_LOCK(MPT_WRITE, &pJobCtx->jobExecLock); if (NULL == pJobCtx->pJob || pJobCtx->pJob->destroyed) { @@ -1253,15 +1385,15 @@ void* mptDropThreadFunc(void* param) { taskId = pJobCtx->taskCtxs[taskIdx].taskId; mptDestroyTask(pJobCtx, taskIdx); - MPT_PRINTF("Drop Thread destroy task %d:0x%" PRIx64 " in job %d:%" PRIx64 "\n", taskIdx, taskId, jobIdx, pJobCtx->jobId); + MPT_EPRINTF("Drop Thread destroy task %d:0x%" PRIx64 " in job %d:%" PRIx64 "\n", taskIdx, taskId, jobIdx, pJobCtx->jobId); MPT_UNLOCK(MPT_WRITE, &pJobCtx->jobExecLock); } else { - MPT_UNLOCK(MPT_WRITE, &pJobCtx->jobExecLock); code = mptDestroyJob(pJobCtx, false); if (0 == code) { - MPT_PRINTF("Drop Thread destroy job %d:%" PRIx64 "\n", jobIdx, pJobCtx->jobId); + MPT_EPRINTF("Drop Thread destroy job %d:%" PRIx64 "\n", jobIdx, pJobCtx->jobId); } + MPT_UNLOCK(MPT_WRITE, &pJobCtx->jobExecLock); } taosMemPoolUnLockPool(gMemPoolHandle, true); @@ -1298,21 +1430,26 @@ void mptDestroyJobs() { for (int32_t i = 0; i < jobNum; ++i) { mptDestroyJob(&mptCtx.jobCtxs[i], false); } + + } void mptRunCase(SMPTestParam* param, int32_t times) { MPT_PRINTF("\t case start the %dth running\n", times); - mptCtx.caseLoop = times; + mptCaseLoop = times; memcpy(&mptCtx.param, param, sizeof(SMPTestParam)); tsSingleQueryMaxMemorySize = param->jobQuota; atomic_store_8(&mptCtx.testDone, 0); + mptCtx.initDone = false; mptInitPool(); mptInitJobs(); + mptCtx.initDone = true; + for (int32_t i = 0; i < mptCtx.param.threadNum; ++i) { mptStartRunThread(i); } @@ -1381,7 +1518,7 @@ TEST(FuncTest, SysMemoryPerfTest) { } #endif -#if 1 +#if 0 TEST(FuncTest, SingleThreadTest) { char* caseName = "FuncTest:SingleThreadTest"; SMPTestParam param = {0}; @@ -1397,6 +1534,24 @@ TEST(FuncTest, SingleThreadTest) { } #endif +#if 1 +TEST(FuncTest, MultiThreadTest) { + char* caseName = "FuncTest:MultiThreadTest"; + SMPTestParam param = {0}; + param.reserveMode = true; + param.threadNum = 6; + param.jobQuota = 1024; + param.randTask = true; + + mptPrintTestBeginInfo(caseName, ¶m); + + for (int32_t i = 0; i < mptCtrl.caseLoopTimes; ++i) { + mptRunCase(¶m, i); + } + +} +#endif + #if 0 TEST(FuncTest, MultiThreadsTest) { char* caseName = "FuncTest:MultiThreadsTest";