From 9ffe0bcd8cc08599488d278f155d83175f631e9c Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 22 Nov 2024 10:17:19 +0800 Subject: [PATCH] fix: job rerun retry latency issue --- source/libs/scheduler/src/schJob.c | 2 +- source/libs/scheduler/src/schTask.c | 6 +++--- source/util/test/memPoolTest.cpp | 26 +++++++++++++------------- 3 files changed, 17 insertions(+), 17 deletions(-) diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 9c3ce047a9..008078c017 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -675,7 +675,7 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) { continue; } - SCH_ERR_RET(schLaunchTask(pJob, pTask)); + SCH_ERR_RET(schDelayLaunchTask(pJob, pTask)); } } diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 8494e60f70..6d3563243d 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -326,7 +326,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) { SCH_TASK_DLOG("all %d children task done, start to launch parent task 0x%" PRIx64, readyNum, parent->taskId); - SCH_ERR_RET(schLaunchTask(pJob, parent)); + SCH_ERR_RET(schDelayLaunchTask(pJob, parent)); } } @@ -768,7 +768,7 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(schSwitchTaskCandidateAddr(pJob, pTask)); } - SCH_ERR_RET(schLaunchTask(pJob, pTask)); + SCH_ERR_RET(schDelayLaunchTask(pJob, pTask)); return TSDB_CODE_SUCCESS; } @@ -1343,7 +1343,7 @@ int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) { for (int32_t i = 0; i < level->taskNum; ++i) { SSchTask *pTask = taosArrayGet(level->subTasks, i); - SCH_ERR_RET(schLaunchTask(pJob, pTask)); + SCH_ERR_RET(schDelayLaunchTask(pJob, pTask)); } return TSDB_CODE_SUCCESS; diff --git a/source/util/test/memPoolTest.cpp b/source/util/test/memPoolTest.cpp index e5a3f203b8..dff7e9e578 100644 --- a/source/util/test/memPoolTest.cpp +++ b/source/util/test/memPoolTest.cpp @@ -1125,6 +1125,8 @@ void mptSimulateOutTask(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) { return; } + mptWriteMem(pTask->npMemList[pTask->npMemIdx].p, pTask->npMemList[pTask->npMemIdx].size); + uDebug("JOB:0x%x TASK:0x%x out malloced, size:%" PRId64 ", mIdx:%d", pJobCtx->jobId, pTask->taskId, pTask->npMemList[pTask->npMemIdx].size, pTask->npMemIdx); pTask->npMemIdx++; @@ -1535,18 +1537,25 @@ TEST(PerfTest, allocLatency) { assert(0 == taosMemPoolCallocJob(0, 0, (void**)&pJob)); assert(0 == taosMemPoolInitSession(gMemPoolHandle, &pSession, pJob, "id")); - int32_t loopTimes = 1000000; + int32_t loopTimes = 10000000; int64_t st = taosGetTimestampUs(); mptEnableMemoryPoolUsage(gMemPoolHandle, pSession); for (int32_t i = 0; i < loopTimes; ++i) { - p = mptMemoryMalloc(msize); + p = (char*)mptMemoryMalloc(msize); assert(0 == code); } - int64_t totalUs = taosGetTimestampUs() - st; + int64_t totalUs1 = taosGetTimestampUs() - st; mptDisableMemoryPoolUsage(); + + st = taosGetTimestampUs(); + for (int32_t i = 0; i < loopTimes; ++i) { + p = (char*)mptMemoryMalloc(msize); + assert(0 == code); + } + int64_t totalUs2 = taosGetTimestampUs() - st; - printf("%d times alloc %" PRId64 " bytes, total time:%" PRId64 "us, avg:%dus\n", loopTimes, totalUs, totalUs/loopTimes); + printf("%d times alloc %" PRId64 " bytes, pool total time:%" PRId64 "us, avg:%fus VS direct total time:%" PRId64 "us, avg:%fus\n", loopTimes, msize, totalUs1, ((double)totalUs1)/loopTimes, totalUs2, ((double)totalUs2)/loopTimes); } #endif @@ -1585,16 +1594,7 @@ TEST(FuncTest, MultiThreadTest) { } #endif -#if 0 -TEST(FuncTest, MultiThreadsTest) { - char* caseName = "FuncTest:MultiThreadsTest"; - SMPTestParam param = {0}; - for (int32_t i = 0; i < mptCtrl.caseLoopTimes; ++i) { - mptRunCase(¶m); - } -} -#endif #endif