diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 699337fe8a..b708536d3b 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -746,8 +746,10 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "retentionSpeedLimitMB", tsRetentionSpeedLimitMB, 0, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE)); + TAOS_CHECK_RETURN(cfgAddBool(pCfg, "queryUseMemoryPool", tsQueryUseMemoryPool, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0); + TAOS_CHECK_RETURN(cfgAddBool(pCfg, "memPoolFullFunc", tsMemPoolFullFunc, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "singleQueryMaxMemorySize", tsSingleQueryMaxMemorySize, 0, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0); //TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "queryBufferPoolSize", tsQueryBufferPoolSize, 0, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "minReservedMemorySize", tsMinReservedMemorySize, 1024, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0); @@ -1708,6 +1710,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "queryUseMemoryPool"); tsQueryUseMemoryPool = pItem->bval; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "memPoolFullFunc"); + tsMemPoolFullFunc = pItem->bval; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "singleQueryMaxMemorySize"); tsSingleQueryMaxMemorySize = pItem->i32; @@ -2083,8 +2088,6 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { {"supportVnodes", &tsNumOfSupportVnodes}, {"experimental", &tsExperimental}, {"maxTsmaNum", &tsMaxTsmaNum}, - {"singleQueryMaxMemorySize", &tsSingleQueryMaxMemorySize}, - {"minReservedMemorySize", &tsMinReservedMemorySize}, {"safetyCheckLevel", &tsSafetyCheckLevel}, {"bypassFlag", &tsBypassFlag}}; diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 7643d6a9c0..e79041e563 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -588,11 +588,11 @@ int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES); if (0 != code) { if (HASH_NODE_EXIST(code)) { - SCH_TASK_ELOG("task already in execTask list, code:%x", code); - SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); + SCH_TASK_DLOG("task already in execTask list, code:%x", code); + return TSDB_CODE_SUCCESS; } - SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno); + SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:0x%x", errno); SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY); } @@ -1312,6 +1312,11 @@ int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) { pTask->delayLaunchPar.queryId = pJob->queryId; pTask->delayLaunchPar.taskId = pTask->taskId; + if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) { + SCH_ERR_RET(schPushTaskToExecList(pJob, pTask)); + SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC); + } + if (NULL == pTask->delayTimer) { pTask->delayTimer = taosTmrStart(schHandleTimerEvent, pTask->delayExecMs, (void *)&pTask->delayLaunchPar, schMgmt.timer); if (NULL == pTask->delayTimer) { diff --git a/source/util/src/tmempool.c b/source/util/src/tmempool.c index e21a21cab0..4efe4b528c 100644 --- a/source/util/src/tmempool.c +++ b/source/util/src/tmempool.c @@ -1016,6 +1016,8 @@ void mpUpdateSystemAvailableMemorySize() { } atomic_store_64(&tsCurrentAvailMemorySize, sysAvailSize); + + uDebug("system available memory size: %" PRId64, sysAvailSize); } void* mpMgmtThreadFunc(void* param) { diff --git a/source/util/test/memPoolTest.cpp b/source/util/test/memPoolTest.cpp index 57baf9612b..2c079270cb 100644 --- a/source/util/test/memPoolTest.cpp +++ b/source/util/test/memPoolTest.cpp @@ -56,6 +56,9 @@ namespace { #define MPT_MIN_MEM_POOL_SIZE (1048576UL) #define MPT_MAX_RETIRE_JOB_NUM 10000 #define MPT_DEFAULT_TASK_RUN_TIMES 10 +#define MPT_NON_POOL_ALLOC_UNIT (256 * 1048576UL) +#define MPT_NON_POOL_KEEP_ALLOC_UNIT 10485760UL +#define MPT_MAX_NON_POOL_ALLOC_TIMES 30000 enum { MPT_READ = 1, @@ -162,11 +165,6 @@ typedef struct { int32_t memIdx; SMPTestMemInfo* pMemList; - - int64_t npSize; - int32_t npMemIdx; - SMPTestMemInfo* npMemList; - bool taskFreed; int32_t lastAct; } SMPTestTaskCtx; @@ -216,6 +214,7 @@ typedef struct SMPTestCtx { BoundedQueue* pJobQueue; SMPTestThread threadCtxs[MPT_MAX_THREAD_NUM]; TdThread dropThreadFp; + TdThread nPoolThreadFp; int32_t jobNum; int64_t totalTaskNum; SMPTestJobCtx* jobCtxs; @@ -228,6 +227,9 @@ typedef struct SMPTestCtx { bool initDone; int8_t testDone; int64_t jobLoop; + + int32_t npIdx; + SMPTestMemInfo* npMemList; } SMPTestCtx; SMPTestCtx mptCtx = {0}; @@ -494,11 +496,7 @@ void mptDestroyTaskCtx(SMPTestJobCtx* pJobCtx, int32_t taskIdx) { mptDestroySession(pJobCtx->jobId, pJobCtx->taskCtxs[taskIdx].taskId, 0, taskIdx, pJobCtx, pJobCtx->pSessions[taskIdx]); pJobCtx->pSessions[taskIdx] = NULL; - for (int32_t i = 0; i < pTask->npMemIdx; ++i) { - taosMemFreeClear(pTask->npMemList[i].p); - } taosMemFreeClear(pTask->pMemList); - taosMemFreeClear(pTask->npMemList); pTask->destoryed = true; } @@ -589,9 +587,6 @@ void mptInitTask(int32_t idx, int32_t eId, SMPTestJobCtx* pJob) { pJob->taskCtxs[idx].pMemList = (SMPTestMemInfo*)taosMemoryCalloc(MPT_MAX_MEM_ACT_TIMES, sizeof(*pJob->taskCtxs[idx].pMemList)); ASSERT_TRUE(NULL != pJob->taskCtxs[idx].pMemList); - pJob->taskCtxs[idx].npMemList = (SMPTestMemInfo*)taosMemoryCalloc(MPT_MAX_MEM_ACT_TIMES, sizeof(*pJob->taskCtxs[idx].npMemList)); - ASSERT_TRUE(NULL != pJob->taskCtxs[idx].npMemList); - pJob->taskCtxs[idx].destoryed = false; uDebug("JOB:0x%x TASK:0x%x idx:%d initialized", pJob->jobId, pJob->taskCtxs[idx].taskId, idx); @@ -769,7 +764,7 @@ void mptRetireJobsCb(int64_t retireSize, int32_t errCode) { taosHashCancelIterate(mptCtx.pJobs, pJob); - uDebug("total %d jobs retired, retiredSize:%" PRId64, jobNum, retiredSize); + uDebug("total %d jobs retired, retiredSize:%" PRId64 " targetRetireSize:%" PRId64, jobNum, retiredSize, retireSize); } @@ -1117,32 +1112,17 @@ void mptSimulateTask(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask, int32_t actT } } -void mptSimulateOutTask(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { - if (atomic_load_8(&pJobCtx->pJob->retired)) { +void mptSimulateOutTask(int64_t targetSize) { + SMPTestMemInfo* pCtx = &mptCtx.npMemList[mptCtx.npIdx]; + pCtx->size = targetSize; + pCtx->p = taosMemMalloc(pCtx->size); + if (NULL == pCtx->p) { + uError("non-pool sim malloc %" PRId64 " failed", pCtx->size); + pCtx->size = 0; return; } - if (taosRand() % 2) { - return; - } - - if (pTask->npMemIdx >= MPT_MAX_MEM_ACT_TIMES) { - return; - } - - pTask->npMemList[pTask->npMemIdx].size = taosRand() % mptCtrl.maxSingleAllocSize; - pTask->npMemList[pTask->npMemIdx].p = taosMemMalloc(pTask->npMemList[pTask->npMemIdx].size); - if (NULL == pTask->npMemList[pTask->npMemIdx].p) { - uError("JOB:0x%x TASK:0x%x out malloc %" PRId64 " failed", pJobCtx->jobId, pTask->taskId, pTask->npMemList[pTask->npMemIdx].size); - pTask->npMemList[pTask->npMemIdx].size = 0; - return; - } - - mptWriteMem(pTask->npMemList[pTask->npMemIdx].p, pTask->npMemList[pTask->npMemIdx].size); - - uDebug("JOB:0x%x TASK:0x%x out malloced, size:%" PRId64 ", mIdx:%d", pJobCtx->jobId, pTask->taskId, pTask->npMemList[pTask->npMemIdx].size, pTask->npMemIdx); - - pTask->npMemIdx++; + mptWriteMem(pCtx->p, pCtx->size); } @@ -1171,9 +1151,7 @@ void mptTaskRun(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pCtx, int32_t idx, int32 mptDisableMemoryPoolUsage(); } -// if (!atomic_load_8(&pJobCtx->pJob->retired)) { - mptSimulateOutTask(pJobCtx, pCtx); -// } + //mptSimulateOutTask(pJobCtx, pCtx); taosWUnLockLatch(&pCtx->taskExecLock); @@ -1372,6 +1350,39 @@ void* mptRunThreadFunc(void* param) { return NULL; } +void* mptNonPoolThreadFunc(void* param) { + int64_t targetSize = MPT_NON_POOL_ALLOC_UNIT; + int64_t allocSize = 0; + int32_t loopTimes = 0; + + while (!atomic_load_8(&mptCtx.testDone)) { + mptSimulateOutTask(targetSize); + + MPT_PRINTF("%d:Non-pool malloc and write %" PRId64 " bytes, keep size:%" PRId64 "\n", loopTimes, targetSize, allocSize); + + taosMsleep(100); + taosMemFreeClear(mptCtx.npMemList[mptCtx.npIdx].p); + + loopTimes++; + + if (0 == (loopTimes % 100)) { + mptSimulateOutTask(MPT_NON_POOL_KEEP_ALLOC_UNIT); + allocSize += MPT_NON_POOL_KEEP_ALLOC_UNIT; + mptCtx.npIdx++; + } + + taosMsleep(100); + + if (loopTimes >= 5000) { + targetSize += MPT_NON_POOL_ALLOC_UNIT; + loopTimes = 0; + } + } + + return NULL; +} + + void* mptDropThreadFunc(void* param) { int32_t jobIdx = 0, taskIdx = 0, code = 0; uint64_t taskId = 0; @@ -1441,6 +1452,15 @@ void mptStartDropThread() { ASSERT_EQ(0, taosThreadAttrDestroy(&thattr)); } +void mptStartNonPoolThread() { + TdThreadAttr thattr; + ASSERT_EQ(0, taosThreadAttrInit(&thattr)); + ASSERT_EQ(0, taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE)); + ASSERT_EQ(0, taosThreadCreate(&mptCtx.nPoolThreadFp, &thattr, mptNonPoolThreadFunc, NULL)); + ASSERT_EQ(0, taosThreadAttrDestroy(&thattr)); +} + + @@ -1455,6 +1475,18 @@ void mptDestroyJobs() { } +void mptDestroyNonPoolCtx() { + for (int32_t i = 0; i < mptCtx.npIdx; ++i) { + taosMemFreeClear(mptCtx.npMemList[i].p); + } + taosMemFreeClear(mptCtx.npMemList); +} + +void mptInitNonPoolCtx() { + mptCtx.npMemList = (SMPTestMemInfo*)taosMemoryCalloc(MPT_MAX_NON_POOL_ALLOC_TIMES, sizeof(*mptCtx.npMemList)); + ASSERT_TRUE(NULL != mptCtx.npMemList); +} + void mptRunCase(SMPTestParam* param, int32_t times) { MPT_PRINTF("\t case start the %dth running\n", times); @@ -1469,6 +1501,8 @@ void mptRunCase(SMPTestParam* param, int32_t times) { mptInitJobs(); + mptInitNonPoolCtx(); + mptCtx.initDone = true; for (int32_t i = 0; i < mptCtx.param.threadNum; ++i) { @@ -1476,6 +1510,7 @@ void mptRunCase(SMPTestParam* param, int32_t times) { } mptStartDropThread(); + mptStartNonPoolThread(); for (int32_t i = 0; i < mptCtx.param.threadNum; ++i) { (void)taosThreadJoin(mptCtx.threadCtxs[i].threadFp, NULL); @@ -1484,8 +1519,10 @@ void mptRunCase(SMPTestParam* param, int32_t times) { atomic_store_8(&mptCtx.testDone, 1); (void)taosThreadJoin(mptCtx.dropThreadFp, NULL); + (void)taosThreadJoin(mptCtx.nPoolThreadFp, NULL); mptDestroyJobs(); + mptDestroyNonPoolCtx(); taosMemPoolClose(gMemPoolHandle); @@ -1546,7 +1583,7 @@ TEST(PerfTest, GetSysAvail) { } #endif -#if 1 +#if 0 TEST(PerfTest, allocLatency) { char* caseName = "PerfTest:allocLatency"; int32_t code = 0; @@ -1878,7 +1915,7 @@ TEST(poolFuncTest, SingleThreadTest) { } #endif -#if 0 +#if 1 TEST(poolFuncTest, MultiThreadTest) { char* caseName = "poolFuncTest:MultiThreadTest"; SMPTestParam param = {0};