diff --git a/source/libs/qworker/inc/qwInt.h b/source/libs/qworker/inc/qwInt.h index 1604769c01..539d954246 100644 --- a/source/libs/qworker/inc/qwInt.h +++ b/source/libs/qworker/inc/qwInt.h @@ -530,8 +530,8 @@ void qwDbgSimulateDead(QW_FPARAMS_DEF, SQWTaskCtx *ctx, bool *rsped); int32_t qwSendExplainResponse(QW_FPARAMS_DEF, SQWTaskCtx *ctx); int32_t qwInitQueryPool(void); void qwDestroyJobInfo(void* job); -void qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx); -void qwRetireJob(SQWJobInfo* pJob); +bool qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx); +bool qwRetireJob(SQWJobInfo* pJob); void qwDestroySession(QW_FPARAMS_DEF, SQWJobInfo *pJobInfo, void* session); int32_t qwInitSession(QW_FPARAMS_DEF, SQWTaskCtx *ctx, void** ppSession); void qwFreeTaskHandle(SQWTaskCtx *ctx); diff --git a/source/libs/qworker/src/qwUtil.c b/source/libs/qworker/src/qwUtil.c index 87cef4c32e..a9b4be0645 100644 --- a/source/libs/qworker/src/qwUtil.c +++ b/source/libs/qworker/src/qwUtil.c @@ -739,8 +739,9 @@ void qwDestroyJobInfo(void* job) { pJob->pSessions = NULL; } -void qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { +bool qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { int32_t code = TSDB_CODE_SUCCESS; + bool taskFreed = false; QW_LOCK(QW_WRITE, &ctx->lock); @@ -750,7 +751,7 @@ void qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { QW_TASK_WLOG_E("task already dropping"); QW_UNLOCK(QW_WRITE, &ctx->lock); - return; + return taskFreed; } if (QW_QUERY_RUNNING(ctx)) { @@ -770,37 +771,47 @@ void qwStopTask(QW_FPARAMS_DEF, SQWTaskCtx *ctx) { QW_TASK_ELOG("task drop failed, error: %x", code); } else { QW_TASK_DLOG_E("task dropped"); + taskFreed = true; } } QW_UNLOCK(QW_WRITE, &ctx->lock); + + return taskFreed; } -void qwRetireTask(QW_FPARAMS_DEF) { +bool qwRetireTask(QW_FPARAMS_DEF) { SQWTaskCtx *ctx = NULL; int32_t code = qwAcquireTaskCtx(QW_FPARAMS(), &ctx); if (TSDB_CODE_SUCCESS != code) { - return; + return false; } - qwStopTask(QW_FPARAMS(), ctx); + bool retired = qwStopTask(QW_FPARAMS(), ctx); qwReleaseTaskCtx(mgmt, ctx); + + return retired; } -void qwRetireJob(SQWJobInfo* pJob) { +bool qwRetireJob(SQWJobInfo* pJob) { if (NULL == pJob) { - return; + return false; } + bool retired = true; void* pIter = taosHashIterate(pJob->pSessions, NULL); while (pIter) { SQWSessionInfo* pSession = (SQWSessionInfo*)pIter; - qwRetireTask((SQWorker *)pSession->mgmt, pSession->sId, pSession->qId, pSession->cId, pSession->tId, pSession->rId, pSession->eId); + if (!qwRetireTask((SQWorker *)pSession->mgmt, pSession->sId, pSession->qId, pSession->cId, pSession->tId, pSession->rId, pSession->eId)) { + retired = false; + } pIter = taosHashIterate(pJob->pSessions, pIter); } + + return retired; } diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 28fff21239..100de805f8 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -35,7 +35,7 @@ void qwStopAllTasks(SQWorker *mgmt) { sId = ctx->sId; - qwStopTask(QW_FPARAMS(), ctx); + (void)qwStopTask(QW_FPARAMS(), ctx); pIter = taosHashIterate(mgmt->ctxHash, pIter); } @@ -1633,7 +1633,7 @@ void qWorkerRetireJob(uint64_t jobId, uint64_t clientId, int32_t errCode) { qDebug("QID:0x%" PRIx64 " CID:0x%" PRIx64 " mark retired, errCode: 0x%x, allocSize:%" PRId64, jobId, clientId, errCode, atomic_load_64(&pJob->memInfo->allocMemSize)); - qwRetireJob(pJob); + (void)qwRetireJob(pJob); } else { qDebug("QID:0x%" PRIx64 " already retired, retired: %d, errCode: 0x%x, allocSize:%" PRId64, jobId, atomic_load_8(&pJob->retired), atomic_load_32(&pJob->errCode), atomic_load_64(&pJob->memInfo->allocMemSize)); } @@ -1641,6 +1641,7 @@ void qWorkerRetireJob(uint64_t jobId, uint64_t clientId, int32_t errCode) { void qWorkerRetireJobs(int64_t retireSize, int32_t errCode) { SQWJobInfo* pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, NULL); + int32_t jobNum = 0; int64_t retiredSize = 0; while (retiredSize < retireSize && NULL != pJob) { if (atomic_load_8(&pJob->retired)) { @@ -1650,17 +1651,21 @@ void qWorkerRetireJobs(int64_t retireSize, int32_t errCode) { if (0 == atomic_val_compare_exchange_32(&pJob->errCode, 0, errCode) && 0 == atomic_val_compare_exchange_8(&pJob->retired, 0, 1)) { int64_t aSize = atomic_load_64(&pJob->memInfo->allocMemSize); + bool retired = qwRetireJob(pJob); + if (retired) { + retiredSize += aSize; + } + + jobNum++; - qwRetireJob(pJob); - - retiredSize += aSize; - - qDebug("QID:0x%" PRIx64 " CID:0x%" PRIx64 " job retired in batch, usedSize:%" PRId64 ", retireSize:%" PRId64, - pJob->memInfo->jobId, pJob->memInfo->clientId, aSize, retireSize); + qDebug("QID:0x%" PRIx64 " CID:0x%" PRIx64 " job mark retired in batch, retired:%d, usedSize:%" PRId64 ", retireSize:%" PRId64, + pJob->memInfo->jobId, pJob->memInfo->clientId, retired, aSize, retireSize); } pJob = (SQWJobInfo*)taosHashIterate(gQueryMgmt.pJobInfo, pJob); } + + qDebug("total %d jobs mark retired, direct retiredSize:%" PRId64 " targetRetireSize:%" PRId64, jobNum, retiredSize, retireSize); } diff --git a/source/util/inc/tmempoolInt.h b/source/util/inc/tmempoolInt.h index afad7549b5..a2fa765077 100755 --- a/source/util/inc/tmempoolInt.h +++ b/source/util/inc/tmempoolInt.h @@ -229,7 +229,7 @@ typedef struct SMPSessionChunk { } SMPSessionChunk; typedef struct SMPSession { - SMPListNode list; + //SMPListNode list; char* sessionId; SMPJob* pJob; @@ -237,7 +237,7 @@ typedef struct SMPSession { int64_t allocMemSize; int64_t maxAllocMemSize; - SMPSessionChunk chunk; + //SMPSessionChunk chunk; SMPStatInfo stat; } SMPSession; diff --git a/source/util/src/mpChunk.c b/source/util/src/mpChunk.c index d50c118971..2c1c415c04 100755 --- a/source/util/src/mpChunk.c +++ b/source/util/src/mpChunk.c @@ -19,7 +19,7 @@ #include "tlog.h" #include "tutil.h" - +#if 0 int32_t mpChunkNew(SMemPool* pPool, SMPChunk** ppChunk) { SMPChunk* pChunk = NULL; MP_ERR_RET(mpPopIdleNode(pPool, &pPool->chunk.chunkCache, (void**)&pChunk)); @@ -315,4 +315,5 @@ int32_t mpChunkUpdateCfg(SMemPool* pPool) { return TSDB_CODE_SUCCESS; } +#endif diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index 4efe4b528c..b54cbe33ab 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -30,7 +30,7 @@ SMemPoolMgmt gMPMgmt = {0}; SMPStrategyFp gMPFps[] = { {NULL}, {NULL, mpDirectFullAlloc, mpDirectFullFree, mpDirectGetMemSize, mpDirectFullRealloc, NULL, NULL, mpDirectTrim}, - {mpChunkInit, mpChunkAlloc, mpChunkFree, mpChunkGetMemSize, mpChunkRealloc, mpChunkInitSession, mpChunkUpdateCfg, NULL} + //{mpChunkInit, mpChunkAlloc, mpChunkFree, mpChunkGetMemSize, mpChunkRealloc, mpChunkInitSession, mpChunkUpdateCfg, NULL} }; @@ -242,7 +242,9 @@ int32_t mpInit(SMemPool* pPool, char* poolName, SMemPoolCfg* cfg) { MP_ERR_RET((*gMPFps[gMPMgmt.strategy].initFp)(pPool, poolName, cfg)); } - MP_ERR_RET(mpInitPosStat(&pPool->stat.posStat, false)); + if (tsMemPoolFullFunc) { + MP_ERR_RET(mpInitPosStat(&pPool->stat.posStat, false)); + } return TSDB_CODE_SUCCESS; } @@ -943,6 +945,10 @@ void mpLogStat(SMemPool* pPool, SMPSession* pSession, EMPStatLogItem item, SMPSt } void mpCheckStatDetail(void* poolHandle, void* session, char* detailName) { + if (0 == tsMemPoolFullFunc) { + return; + } + SMemPool* pPool = (SMemPool*)poolHandle; SMPSession* pSession = (SMPSession*)session; SMPCtrlInfo* pCtrl = NULL; @@ -1115,7 +1121,7 @@ void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName) { SMPSession* pSession = (SMPSession*)session; char detailName[128]; - if (NULL != pSession) { + if (NULL != pSession && MP_GET_FLAG(pSession->ctrl.funcFlags, MP_CTRL_FLAG_PRINT_STAT)) { snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "Session"); detailName[sizeof(detailName) - 1] = 0; mpPrintStatDetail(&pSession->ctrl, &pSession->stat.statDetail, detailName, pSession->maxAllocMemSize); @@ -1125,15 +1131,15 @@ void taosMemPoolPrintStat(void* poolHandle, void* session, char* procName) { mpPrintPosStat(&pSession->ctrl, &pSession->stat.posStat, detailName); } - if (NULL != pPool) { + if (NULL != pPool && MP_GET_FLAG(gMPMgmt.ctrl.funcFlags, MP_CTRL_FLAG_PRINT_STAT)) { snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, pPool->name); detailName[sizeof(detailName) - 1] = 0; mpPrintSessionStat(&gMPMgmt.ctrl, &pPool->stat.statSession, detailName); mpPrintStatDetail(&gMPMgmt.ctrl, &pPool->stat.statDetail, detailName, pPool->maxAllocMemSize); - snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolNode"); - detailName[sizeof(detailName) - 1] = 0; - mpPrintNodeStat(&gMPMgmt.ctrl, pPool->stat.nodeStat, detailName); + //snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolNode"); + //detailName[sizeof(detailName) - 1] = 0; + //mpPrintNodeStat(&gMPMgmt.ctrl, pPool->stat.nodeStat, detailName); snprintf(detailName, sizeof(detailName) - 1, "%s - %s", procName, "MemPoolPos"); detailName[sizeof(detailName) - 1] = 0; @@ -1200,14 +1206,13 @@ void taosMemPoolDestroySession(void* poolHandle, void* session) { uWarn("null pointer of poolHandle %p or session %p", poolHandle, session); return; } + + if (tsMemPoolFullFunc) { + (void)atomic_add_fetch_64(&pPool->stat.statSession.destroyNum, 1); + mpCheckStatDetail(pPool, pSession, "DestroySession"); + mpDestroyPosStat(&pSession->stat.posStat); + } - //TODO; - - (void)atomic_add_fetch_64(&pPool->stat.statSession.destroyNum, 1); - - mpCheckStatDetail(pPool, pSession, "DestroySession"); - - mpDestroyPosStat(&pSession->stat.posStat); taosMemFreeClear(pSession->sessionId); TAOS_MEMSET(pSession, 0, sizeof(*pSession)); @@ -1233,7 +1238,9 @@ int32_t taosMemPoolInitSession(void* poolHandle, void** ppSession, void* pJob, c MP_ERR_JRET((*gMPFps[gMPMgmt.strategy].initSessionFp)(pPool, pSession)); } - MP_ERR_JRET(mpInitPosStat(&pSession->stat.posStat, true)); + if (tsMemPoolFullFunc) { + MP_ERR_JRET(mpInitPosStat(&pSession->stat.posStat, true)); + } pSession->pJob = (SMPJob*)pJob; @@ -1526,10 +1533,11 @@ void taosMemPoolClose(void* poolHandle) { SMemPool* pPool = (SMemPool*)poolHandle; - mpCheckStatDetail(pPool, NULL, "PoolClose"); - - mpDestroyPosStat(&pPool->stat.posStat); - + if (tsMemPoolFullFunc) { + mpCheckStatDetail(pPool, NULL, "PoolClose"); + mpDestroyPosStat(&pPool->stat.posStat); + } + taosMemoryFree(pPool->name); mpDestroyCacheGroup(&pPool->sessionCache); diff --git a/source/util/test/memPoolTest.cpp b/source/util/test/memPoolTest.cpp index 2c079270cb..5b9ddecfd1 100644 --- a/source/util/test/memPoolTest.cpp +++ b/source/util/test/memPoolTest.cpp @@ -57,7 +57,7 @@ namespace { #define MPT_MAX_RETIRE_JOB_NUM 10000 #define MPT_DEFAULT_TASK_RUN_TIMES 10 #define MPT_NON_POOL_ALLOC_UNIT (256 * 1048576UL) -#define MPT_NON_POOL_KEEP_ALLOC_UNIT 10485760UL +#define MPT_NON_POOL_KEEP_ALLOC_UNIT (10485760UL * 8) #define MPT_MAX_NON_POOL_ALLOC_TIMES 30000 enum { @@ -685,21 +685,25 @@ int32_t mptResetJob(SMPTestJobCtx* pJobCtx) { return 0; } -void mptRetireJob(SMPTJobInfo* pJob) { +bool mptRetireJob(SMPTJobInfo* pJob) { SMPTestJobCtx* pCtx = (SMPTestJobCtx*)pJob->pCtx; if (MPT_TRY_LOCK(MPT_WRITE, &pCtx->jobExecLock)) { - return; + return false; } + bool retired = false; int32_t taskRunning = atomic_load_32(&pCtx->taskRunningNum); if (0 == taskRunning) { mptDestroyJob(pCtx, false); + retired = true; } else { uDebug("JOB:0x%x retired but will not destroy cause of task running, num:%d", pCtx->jobId, taskRunning); } MPT_UNLOCK(MPT_WRITE, &pCtx->jobExecLock); + + return retired; } int32_t mptGetMemPoolMaxMemSize(void* pHandle, int64_t* maxSize) { @@ -750,13 +754,15 @@ void mptRetireJobsCb(int64_t retireSize, int32_t errCode) { atomic_add_fetch_64(&mptCtx.runStat.retireNum, 1); - mptRetireJob(pJob); - - retiredSize += aSize; + bool retired = mptRetireJob(pJob); + if (retired) { + retiredSize += aSize; + } + jobNum++; - uDebug("QID:0x%" PRIx64 " job retired cause of limit reached, usedSize:%" PRId64 ", retireSize:%" PRId64 ", retiredSize:%" PRId64, - jobId, aSize, retireSize, retiredSize); + uDebug("QID:0x%" PRIx64 " job mark retired cause of limit reached, retired:%d, usedSize:%" PRId64 ", retireSize:%" PRId64 ", retiredSize:%" PRId64, + jobId, retired, aSize, retireSize, retiredSize); } pJob = (SMPTJobInfo*)taosHashIterate(mptCtx.pJobs, pJob); @@ -764,7 +770,7 @@ void mptRetireJobsCb(int64_t retireSize, int32_t errCode) { taosHashCancelIterate(mptCtx.pJobs, pJob); - uDebug("total %d jobs retired, retiredSize:%" PRId64 " targetRetireSize:%" PRId64, jobNum, retiredSize, retireSize); + uDebug("total %d jobs mark retired, retiredSize:%" PRId64 " targetRetireSize:%" PRId64, jobNum, retiredSize, retireSize); } @@ -1575,7 +1581,7 @@ TEST(PerfTest, GetSysAvail) { for (int32_t i = 0; i < loopTimes; ++i) { code = taosGetSysAvailMemory(&freeSize); assert(0 == code); - taosMsleep(5); + //taosMsleep(10); } totalUs = taosGetTimestampUs() - st; @@ -1915,7 +1921,7 @@ TEST(poolFuncTest, SingleThreadTest) { } #endif -#if 1 +#if 0 TEST(poolFuncTest, MultiThreadTest) { char* caseName = "poolFuncTest:MultiThreadTest"; SMPTestParam param = {0};