diff --git a/include/util/tlog.h b/include/util/tlog.h index a6c87593d1..e31992394c 100644 --- a/include/util/tlog.h +++ b/include/util/tlog.h @@ -76,6 +76,7 @@ int32_t taosInitLog(const char *logName, int32_t maxFiles, bool tsc); void taosCloseLog(); void taosResetLog(); void taosDumpData(uint8_t *msg, int32_t len); +void taosSetNoNewFile(); void taosPrintLog(const char *flags, int32_t level, int32_t dflag, const char *format, ...) #ifdef __GNUC__ diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 5868f3d588..72914a963f 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -226,6 +226,7 @@ typedef struct SSchTimerParam { typedef struct SSchTask { uint64_t clientId; // current client id uint64_t taskId; // task id + uint64_t seriousId; SRWLatch lock; // task reentrant lock int32_t maxExecTimes; // task max exec times int32_t maxRetryTimes; // task max retry times diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 008078c017..9fb901800b 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -995,11 +995,26 @@ int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode) { return TSDB_CODE_SUCCESS; } -int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode, bool *inRetry) { - int8_t origInRetry = atomic_val_compare_exchange_8(&pJob->inRetry, 0, 1); - if (0 != origInRetry) { - SCH_JOB_DLOG("job already in retry, origInRetry: %d", pJob->inRetry); - return TSDB_CODE_SCH_IGNORE_ERROR; +int32_t schResetJobForRetry(SSchJob *pJob, SSchTask *pTask, int32_t rspCode, bool *inRetry) { + while (true) { + if (pTask->seriousId < atomic_load_64(&pJob->seriousId)) { + SCH_TASK_DLOG("task sId %" PRId64 " is smaller than current job sId %" PRId64, pTask->seriousId, pJob->seriousId); + return TSDB_CODE_SCH_IGNORE_ERROR; + } + + int8_t origInRetry = atomic_val_compare_exchange_8(&pJob->inRetry, 0, 1); + if (0 != origInRetry) { + SCH_JOB_DLOG("job already in retry, origInRetry: %d", pJob->inRetry); + taosUsleep(1); + continue; + } + + if (pTask->seriousId < atomic_load_64(&pJob->seriousId)) { + SCH_TASK_DLOG("task sId %" PRId64 " is smaller than current job sId %" PRId64, pTask->seriousId, pJob->seriousId); + return TSDB_CODE_SCH_IGNORE_ERROR; + } + + break; } *inRetry = true; @@ -1040,6 +1055,8 @@ int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode, bool *inRetry) { SCH_RESET_JOB_LEVEL_IDX(pJob); atomic_add_fetch_64(&pJob->seriousId, 1); + + SCH_JOB_DLOG("update job sId to %" PRId64, pJob->seriousId); return TSDB_CODE_SUCCESS; } @@ -1055,7 +1072,7 @@ int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_ SCH_TASK_DLOG("start to redirect all job tasks cause of error: %s", tstrerror(rspCode)); - SCH_ERR_JRET(schResetJobForRetry(pJob, rspCode, &inRetry)); + SCH_ERR_JRET(schResetJobForRetry(pJob, pTask, rspCode, &inRetry)); SCH_ERR_JRET(schLaunchJob(pJob)); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 73a56cf800..70c63e60b3 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -1143,7 +1143,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, case TDMT_VND_DELETE: { SVDeleteReq req = {0}; req.header.vgId = addr->nodeId; - req.sId = pJob->seriousId; + req.sId = pTask->seriousId; req.queryId = pJob->queryId; req.clientId = pTask->clientId; req.taskId = pTask->taskId; @@ -1177,7 +1177,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, SSubQueryMsg qMsg; qMsg.header.vgId = addr->nodeId; qMsg.header.contLen = 0; - qMsg.sId = pJob->seriousId; + qMsg.sId = pTask->seriousId; qMsg.queryId = pJob->queryId; qMsg.clientId = pTask->clientId; qMsg.taskId = pTask->taskId; @@ -1233,7 +1233,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, case TDMT_SCH_MERGE_FETCH: { SResFetchReq req = {0}; req.header.vgId = addr->nodeId; - req.sId = pJob->seriousId; + req.sId = pTask->seriousId; req.queryId = pJob->queryId; req.clientId = pTask->clientId; req.taskId = pTask->taskId; @@ -1261,7 +1261,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, STaskDropReq qMsg; qMsg.header.vgId = addr->nodeId; qMsg.header.contLen = 0; - qMsg.sId = pJob->seriousId; + qMsg.sId = pTask->seriousId; qMsg.queryId = pJob->queryId; qMsg.clientId = pTask->clientId; qMsg.taskId = pTask->taskId; @@ -1319,7 +1319,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr, STaskNotifyReq qMsg; qMsg.header.vgId = addr->nodeId; qMsg.header.contLen = 0; - qMsg.sId = pJob->seriousId; + qMsg.sId = pTask->seriousId; qMsg.queryId = pJob->queryId; qMsg.clientId = pTask->clientId; qMsg.taskId = pTask->taskId; diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index e79041e563..706879b0cf 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -308,7 +308,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { .type = QUERY_NODE_DOWNSTREAM_SOURCE, .clientId = pTask->clientId, .taskId = pTask->taskId, - .sId = pJob->seriousId, + .sId = pTask->seriousId, .execId = pTask->execId, .addr = pTask->succeedAddr, .fetchMsgType = SCH_FETCH_TYPE(pTask), @@ -1342,6 +1342,7 @@ int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) { for (int32_t i = 0; i < level->taskNum; ++i) { SSchTask *pTask = taosArrayGet(level->subTasks, i); + pTask->seriousId = pJob->seriousId; SCH_ERR_RET(schDelayLaunchTask(pJob, pTask)); } diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index f70b145dbc..c1b96f6467 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -249,6 +249,10 @@ int32_t taosInitLog(const char *logName, int32_t maxFiles, bool tsc) { return 0; } +void taosSetNoNewFile() { + tsLogObj.openInProgress = 1; +} + static void taosStopLog() { if (tsLogObj.logHandle) { tsLogObj.logHandle->stop = 1; diff --git a/source/util/test/memPoolTest.cpp b/source/util/test/memPoolTest.cpp index 5b9ddecfd1..add8d4a17e 100644 --- a/source/util/test/memPoolTest.cpp +++ b/source/util/test/memPoolTest.cpp @@ -391,6 +391,7 @@ void mptInitLogFile() { if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum, false) < 0) { MPT_PRINTF("failed to open log file in directory:%s\n", tsLogDir); } + taosSetNoNewFile(); } static bool mptJobMemSizeCompFn(void* l, void* r, void* param) { @@ -443,6 +444,8 @@ void mptInit() { ASSERT_TRUE(NULL != mptCtx.pSrcString); memset(mptCtx.pSrcString, 'P', mptCtrl.maxSingleAllocSize - 1); mptCtx.pSrcString[mptCtrl.maxSingleAllocSize - 1] = 0; + + void* p = taosMemMalloc(1048576UL * 20000); } void mptDestroySession(uint64_t qId, int64_t tId, int32_t eId, int32_t taskIdx, SMPTestJobCtx* pJobCtx, void* session) { @@ -793,7 +796,7 @@ void mptInitPool(void) { assert(0 == taosMemoryPoolInit(mptRetireJobsCb, mptRetireJobCb)); } -void mptWriteMem(void* pStart, int32_t size) { +void mptWriteMem(void* pStart, int64_t size) { char* pEnd = (char*)pStart + size - 1; char* p = (char*)pStart; while (p <= pEnd) { @@ -1357,7 +1360,7 @@ void* mptRunThreadFunc(void* param) { } void* mptNonPoolThreadFunc(void* param) { - int64_t targetSize = MPT_NON_POOL_ALLOC_UNIT; + int64_t targetSize = MPT_NON_POOL_ALLOC_UNIT * 3; int64_t allocSize = 0; int32_t loopTimes = 0; @@ -1379,7 +1382,7 @@ void* mptNonPoolThreadFunc(void* param) { taosMsleep(100); - if (loopTimes >= 5000) { + if (loopTimes >= 4000) { targetSize += MPT_NON_POOL_ALLOC_UNIT; loopTimes = 0; }