fix: task delay timer not closed issue

This commit is contained in:
dapan1121 2024-11-26 13:36:15 +08:00
parent 050a0dfe9f
commit 80b1ab8f45
4 changed files with 93 additions and 46 deletions

View File

@ -746,8 +746,10 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "queryRspPolicy", tsQueryRspPolicy, 0, 1, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "retentionSpeedLimitMB", tsRetentionSpeedLimitMB, 0, 1024, CFG_SCOPE_SERVER, CFG_DYN_NONE));
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "queryUseMemoryPool", tsQueryUseMemoryPool, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0);
TAOS_CHECK_RETURN(cfgAddBool(pCfg, "memPoolFullFunc", tsMemPoolFullFunc, CFG_SCOPE_SERVER, CFG_DYN_NONE) != 0);
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "singleQueryMaxMemorySize", tsSingleQueryMaxMemorySize, 0, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0);
//TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "queryBufferPoolSize", tsQueryBufferPoolSize, 0, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0);
TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "minReservedMemorySize", tsMinReservedMemorySize, 1024, 1000000000, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER) != 0);
@ -1708,6 +1710,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "queryUseMemoryPool");
tsQueryUseMemoryPool = pItem->bval;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "memPoolFullFunc");
tsMemPoolFullFunc = pItem->bval;
TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "singleQueryMaxMemorySize");
tsSingleQueryMaxMemorySize = pItem->i32;
@ -2083,8 +2088,6 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) {
{"supportVnodes", &tsNumOfSupportVnodes},
{"experimental", &tsExperimental},
{"maxTsmaNum", &tsMaxTsmaNum},
{"singleQueryMaxMemorySize", &tsSingleQueryMaxMemorySize},
{"minReservedMemorySize", &tsMinReservedMemorySize},
{"safetyCheckLevel", &tsSafetyCheckLevel},
{"bypassFlag", &tsBypassFlag}};

View File

@ -588,11 +588,11 @@ int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) {
int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES);
if (0 != code) {
if (HASH_NODE_EXIST(code)) {
SCH_TASK_ELOG("task already in execTask list, code:%x", code);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
SCH_TASK_DLOG("task already in execTask list, code:%x", code);
return TSDB_CODE_SUCCESS;
}
SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:%d", errno);
SCH_TASK_ELOG("taosHashPut task to execTask list failed, errno:0x%x", errno);
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
}
@ -1312,6 +1312,11 @@ int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) {
pTask->delayLaunchPar.queryId = pJob->queryId;
pTask->delayLaunchPar.taskId = pTask->taskId;
if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXEC) {
SCH_ERR_RET(schPushTaskToExecList(pJob, pTask));
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_EXEC);
}
if (NULL == pTask->delayTimer) {
pTask->delayTimer = taosTmrStart(schHandleTimerEvent, pTask->delayExecMs, (void *)&pTask->delayLaunchPar, schMgmt.timer);
if (NULL == pTask->delayTimer) {

View File

@ -1016,6 +1016,8 @@ void mpUpdateSystemAvailableMemorySize() {
}
atomic_store_64(&tsCurrentAvailMemorySize, sysAvailSize);
uDebug("system available memory size: %" PRId64, sysAvailSize);
}
void* mpMgmtThreadFunc(void* param) {

View File

@ -56,6 +56,9 @@ namespace {
#define MPT_MIN_MEM_POOL_SIZE (1048576UL)
#define MPT_MAX_RETIRE_JOB_NUM 10000
#define MPT_DEFAULT_TASK_RUN_TIMES 10
#define MPT_NON_POOL_ALLOC_UNIT (256 * 1048576UL)
#define MPT_NON_POOL_KEEP_ALLOC_UNIT 10485760UL
#define MPT_MAX_NON_POOL_ALLOC_TIMES 30000
enum {
MPT_READ = 1,
@ -162,11 +165,6 @@ typedef struct {
int32_t memIdx;
SMPTestMemInfo* pMemList;
int64_t npSize;
int32_t npMemIdx;
SMPTestMemInfo* npMemList;
bool taskFreed;
int32_t lastAct;
} SMPTestTaskCtx;
@ -216,6 +214,7 @@ typedef struct SMPTestCtx {
BoundedQueue* pJobQueue;
SMPTestThread threadCtxs[MPT_MAX_THREAD_NUM];
TdThread dropThreadFp;
TdThread nPoolThreadFp;
int32_t jobNum;
int64_t totalTaskNum;
SMPTestJobCtx* jobCtxs;
@ -228,6 +227,9 @@ typedef struct SMPTestCtx {
bool initDone;
int8_t testDone;
int64_t jobLoop;
int32_t npIdx;
SMPTestMemInfo* npMemList;
} SMPTestCtx;
SMPTestCtx mptCtx = {0};
@ -494,11 +496,7 @@ void mptDestroyTaskCtx(SMPTestJobCtx* pJobCtx, int32_t taskIdx) {
mptDestroySession(pJobCtx->jobId, pJobCtx->taskCtxs[taskIdx].taskId, 0, taskIdx, pJobCtx, pJobCtx->pSessions[taskIdx]);
pJobCtx->pSessions[taskIdx] = NULL;
for (int32_t i = 0; i < pTask->npMemIdx; ++i) {
taosMemFreeClear(pTask->npMemList[i].p);
}
taosMemFreeClear(pTask->pMemList);
taosMemFreeClear(pTask->npMemList);
pTask->destoryed = true;
}
@ -589,9 +587,6 @@ void mptInitTask(int32_t idx, int32_t eId, SMPTestJobCtx* pJob) {
pJob->taskCtxs[idx].pMemList = (SMPTestMemInfo*)taosMemoryCalloc(MPT_MAX_MEM_ACT_TIMES, sizeof(*pJob->taskCtxs[idx].pMemList));
ASSERT_TRUE(NULL != pJob->taskCtxs[idx].pMemList);
pJob->taskCtxs[idx].npMemList = (SMPTestMemInfo*)taosMemoryCalloc(MPT_MAX_MEM_ACT_TIMES, sizeof(*pJob->taskCtxs[idx].npMemList));
ASSERT_TRUE(NULL != pJob->taskCtxs[idx].npMemList);
pJob->taskCtxs[idx].destoryed = false;
uDebug("JOB:0x%x TASK:0x%x idx:%d initialized", pJob->jobId, pJob->taskCtxs[idx].taskId, idx);
@ -769,7 +764,7 @@ void mptRetireJobsCb(int64_t retireSize, int32_t errCode) {
taosHashCancelIterate(mptCtx.pJobs, pJob);
uDebug("total %d jobs retired, retiredSize:%" PRId64, jobNum, retiredSize);
uDebug("total %d jobs retired, retiredSize:%" PRId64 " targetRetireSize:%" PRId64, jobNum, retiredSize, retireSize);
}
@ -1117,32 +1112,17 @@ void mptSimulateTask(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask, int32_t actT
}
}
void mptSimulateOutTask(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
if (atomic_load_8(&pJobCtx->pJob->retired)) {
void mptSimulateOutTask(int64_t targetSize) {
SMPTestMemInfo* pCtx = &mptCtx.npMemList[mptCtx.npIdx];
pCtx->size = targetSize;
pCtx->p = taosMemMalloc(pCtx->size);
if (NULL == pCtx->p) {
uError("non-pool sim malloc %" PRId64 " failed", pCtx->size);
pCtx->size = 0;
return;
}
if (taosRand() % 2) {
return;
}
if (pTask->npMemIdx >= MPT_MAX_MEM_ACT_TIMES) {
return;
}
pTask->npMemList[pTask->npMemIdx].size = taosRand() % mptCtrl.maxSingleAllocSize;
pTask->npMemList[pTask->npMemIdx].p = taosMemMalloc(pTask->npMemList[pTask->npMemIdx].size);
if (NULL == pTask->npMemList[pTask->npMemIdx].p) {
uError("JOB:0x%x TASK:0x%x out malloc %" PRId64 " failed", pJobCtx->jobId, pTask->taskId, pTask->npMemList[pTask->npMemIdx].size);
pTask->npMemList[pTask->npMemIdx].size = 0;
return;
}
mptWriteMem(pTask->npMemList[pTask->npMemIdx].p, pTask->npMemList[pTask->npMemIdx].size);
uDebug("JOB:0x%x TASK:0x%x out malloced, size:%" PRId64 ", mIdx:%d", pJobCtx->jobId, pTask->taskId, pTask->npMemList[pTask->npMemIdx].size, pTask->npMemIdx);
pTask->npMemIdx++;
mptWriteMem(pCtx->p, pCtx->size);
}
@ -1171,9 +1151,7 @@ void mptTaskRun(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pCtx, int32_t idx, int32
mptDisableMemoryPoolUsage();
}
// if (!atomic_load_8(&pJobCtx->pJob->retired)) {
mptSimulateOutTask(pJobCtx, pCtx);
// }
//mptSimulateOutTask(pJobCtx, pCtx);
taosWUnLockLatch(&pCtx->taskExecLock);
@ -1372,6 +1350,39 @@ void* mptRunThreadFunc(void* param) {
return NULL;
}
void* mptNonPoolThreadFunc(void* param) {
int64_t targetSize = MPT_NON_POOL_ALLOC_UNIT;
int64_t allocSize = 0;
int32_t loopTimes = 0;
while (!atomic_load_8(&mptCtx.testDone)) {
mptSimulateOutTask(targetSize);
MPT_PRINTF("%d:Non-pool malloc and write %" PRId64 " bytes, keep size:%" PRId64 "\n", loopTimes, targetSize, allocSize);
taosMsleep(100);
taosMemFreeClear(mptCtx.npMemList[mptCtx.npIdx].p);
loopTimes++;
if (0 == (loopTimes % 100)) {
mptSimulateOutTask(MPT_NON_POOL_KEEP_ALLOC_UNIT);
allocSize += MPT_NON_POOL_KEEP_ALLOC_UNIT;
mptCtx.npIdx++;
}
taosMsleep(100);
if (loopTimes >= 5000) {
targetSize += MPT_NON_POOL_ALLOC_UNIT;
loopTimes = 0;
}
}
return NULL;
}
void* mptDropThreadFunc(void* param) {
int32_t jobIdx = 0, taskIdx = 0, code = 0;
uint64_t taskId = 0;
@ -1441,6 +1452,15 @@ void mptStartDropThread() {
ASSERT_EQ(0, taosThreadAttrDestroy(&thattr));
}
void mptStartNonPoolThread() {
TdThreadAttr thattr;
ASSERT_EQ(0, taosThreadAttrInit(&thattr));
ASSERT_EQ(0, taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE));
ASSERT_EQ(0, taosThreadCreate(&mptCtx.nPoolThreadFp, &thattr, mptNonPoolThreadFunc, NULL));
ASSERT_EQ(0, taosThreadAttrDestroy(&thattr));
}
@ -1455,6 +1475,18 @@ void mptDestroyJobs() {
}
void mptDestroyNonPoolCtx() {
for (int32_t i = 0; i < mptCtx.npIdx; ++i) {
taosMemFreeClear(mptCtx.npMemList[i].p);
}
taosMemFreeClear(mptCtx.npMemList);
}
void mptInitNonPoolCtx() {
mptCtx.npMemList = (SMPTestMemInfo*)taosMemoryCalloc(MPT_MAX_NON_POOL_ALLOC_TIMES, sizeof(*mptCtx.npMemList));
ASSERT_TRUE(NULL != mptCtx.npMemList);
}
void mptRunCase(SMPTestParam* param, int32_t times) {
MPT_PRINTF("\t case start the %dth running\n", times);
@ -1469,6 +1501,8 @@ void mptRunCase(SMPTestParam* param, int32_t times) {
mptInitJobs();
mptInitNonPoolCtx();
mptCtx.initDone = true;
for (int32_t i = 0; i < mptCtx.param.threadNum; ++i) {
@ -1476,6 +1510,7 @@ void mptRunCase(SMPTestParam* param, int32_t times) {
}
mptStartDropThread();
mptStartNonPoolThread();
for (int32_t i = 0; i < mptCtx.param.threadNum; ++i) {
(void)taosThreadJoin(mptCtx.threadCtxs[i].threadFp, NULL);
@ -1484,8 +1519,10 @@ void mptRunCase(SMPTestParam* param, int32_t times) {
atomic_store_8(&mptCtx.testDone, 1);
(void)taosThreadJoin(mptCtx.dropThreadFp, NULL);
(void)taosThreadJoin(mptCtx.nPoolThreadFp, NULL);
mptDestroyJobs();
mptDestroyNonPoolCtx();
taosMemPoolClose(gMemPoolHandle);
@ -1546,7 +1583,7 @@ TEST(PerfTest, GetSysAvail) {
}
#endif
#if 1
#if 0
TEST(PerfTest, allocLatency) {
char* caseName = "PerfTest:allocLatency";
int32_t code = 0;
@ -1878,7 +1915,7 @@ TEST(poolFuncTest, SingleThreadTest) {
}
#endif
#if 0
#if 1
TEST(poolFuncTest, MultiThreadTest) {
char* caseName = "poolFuncTest:MultiThreadTest";
SMPTestParam param = {0};