fix: job rerun retry latency issue

This commit is contained in:
dapan1121 2024-11-22 10:17:19 +08:00
parent 205d47d981
commit 9ffe0bcd8c
3 changed files with 17 additions and 17 deletions

View File

@ -675,7 +675,7 @@ int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask) {
continue; continue;
} }
SCH_ERR_RET(schLaunchTask(pJob, pTask)); SCH_ERR_RET(schDelayLaunchTask(pJob, pTask));
} }
} }

View File

@ -326,7 +326,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) { if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) {
SCH_TASK_DLOG("all %d children task done, start to launch parent task 0x%" PRIx64, readyNum, parent->taskId); SCH_TASK_DLOG("all %d children task done, start to launch parent task 0x%" PRIx64, readyNum, parent->taskId);
SCH_ERR_RET(schLaunchTask(pJob, parent)); SCH_ERR_RET(schDelayLaunchTask(pJob, parent));
} }
} }
@ -768,7 +768,7 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
SCH_ERR_RET(schSwitchTaskCandidateAddr(pJob, pTask)); SCH_ERR_RET(schSwitchTaskCandidateAddr(pJob, pTask));
} }
SCH_ERR_RET(schLaunchTask(pJob, pTask)); SCH_ERR_RET(schDelayLaunchTask(pJob, pTask));
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1343,7 +1343,7 @@ int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
for (int32_t i = 0; i < level->taskNum; ++i) { for (int32_t i = 0; i < level->taskNum; ++i) {
SSchTask *pTask = taosArrayGet(level->subTasks, i); SSchTask *pTask = taosArrayGet(level->subTasks, i);
SCH_ERR_RET(schLaunchTask(pJob, pTask)); SCH_ERR_RET(schDelayLaunchTask(pJob, pTask));
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;

View File

@ -1125,6 +1125,8 @@ void mptSimulateOutTask(SMPTestJobCtx* pJobCtx, SMPTestTaskCtx* pTask) {
return; return;
} }
mptWriteMem(pTask->npMemList[pTask->npMemIdx].p, pTask->npMemList[pTask->npMemIdx].size);
uDebug("JOB:0x%x TASK:0x%x out malloced, size:%" PRId64 ", mIdx:%d", pJobCtx->jobId, pTask->taskId, pTask->npMemList[pTask->npMemIdx].size, pTask->npMemIdx); uDebug("JOB:0x%x TASK:0x%x out malloced, size:%" PRId64 ", mIdx:%d", pJobCtx->jobId, pTask->taskId, pTask->npMemList[pTask->npMemIdx].size, pTask->npMemIdx);
pTask->npMemIdx++; pTask->npMemIdx++;
@ -1535,18 +1537,25 @@ TEST(PerfTest, allocLatency) {
assert(0 == taosMemPoolCallocJob(0, 0, (void**)&pJob)); assert(0 == taosMemPoolCallocJob(0, 0, (void**)&pJob));
assert(0 == taosMemPoolInitSession(gMemPoolHandle, &pSession, pJob, "id")); assert(0 == taosMemPoolInitSession(gMemPoolHandle, &pSession, pJob, "id"));
int32_t loopTimes = 1000000; int32_t loopTimes = 10000000;
int64_t st = taosGetTimestampUs(); int64_t st = taosGetTimestampUs();
mptEnableMemoryPoolUsage(gMemPoolHandle, pSession); mptEnableMemoryPoolUsage(gMemPoolHandle, pSession);
for (int32_t i = 0; i < loopTimes; ++i) { for (int32_t i = 0; i < loopTimes; ++i) {
p = mptMemoryMalloc(msize); p = (char*)mptMemoryMalloc(msize);
assert(0 == code); assert(0 == code);
} }
int64_t totalUs = taosGetTimestampUs() - st; int64_t totalUs1 = taosGetTimestampUs() - st;
mptDisableMemoryPoolUsage(); mptDisableMemoryPoolUsage();
st = taosGetTimestampUs();
for (int32_t i = 0; i < loopTimes; ++i) {
p = (char*)mptMemoryMalloc(msize);
assert(0 == code);
}
int64_t totalUs2 = taosGetTimestampUs() - st;
printf("%d times alloc %" PRId64 " bytes, total time:%" PRId64 "us, avg:%dus\n", loopTimes, totalUs, totalUs/loopTimes); printf("%d times alloc %" PRId64 " bytes, pool total time:%" PRId64 "us, avg:%fus VS direct total time:%" PRId64 "us, avg:%fus\n", loopTimes, msize, totalUs1, ((double)totalUs1)/loopTimes, totalUs2, ((double)totalUs2)/loopTimes);
} }
#endif #endif
@ -1585,16 +1594,7 @@ TEST(FuncTest, MultiThreadTest) {
} }
#endif #endif
#if 0
TEST(FuncTest, MultiThreadsTest) {
char* caseName = "FuncTest:MultiThreadsTest";
SMPTestParam param = {0};
for (int32_t i = 0; i < mptCtrl.caseLoopTimes; ++i) {
mptRunCase(&param);
}
}
#endif
#endif #endif