fix: job retry issue

This commit is contained in:
dapan1121 2024-11-27 09:50:00 +08:00
parent 3d3956efda
commit 30eb4b2633
7 changed files with 42 additions and 15 deletions

View File

@ -76,6 +76,7 @@ int32_t taosInitLog(const char *logName, int32_t maxFiles, bool tsc);
void taosCloseLog();
void taosResetLog();
void taosDumpData(uint8_t *msg, int32_t len);
void taosSetNoNewFile();
void taosPrintLog(const char *flags, int32_t level, int32_t dflag, const char *format, ...)
#ifdef __GNUC__

View File

@ -226,6 +226,7 @@ typedef struct SSchTimerParam {
typedef struct SSchTask {
uint64_t clientId; // current client id
uint64_t taskId; // task id
uint64_t seriousId;
SRWLatch lock; // task reentrant lock
int32_t maxExecTimes; // task max exec times
int32_t maxRetryTimes; // task max retry times

View File

@ -995,11 +995,26 @@ int32_t schChkResetJobRetry(SSchJob *pJob, int32_t rspCode) {
return TSDB_CODE_SUCCESS;
}
int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode, bool *inRetry) {
int8_t origInRetry = atomic_val_compare_exchange_8(&pJob->inRetry, 0, 1);
if (0 != origInRetry) {
SCH_JOB_DLOG("job already in retry, origInRetry: %d", pJob->inRetry);
return TSDB_CODE_SCH_IGNORE_ERROR;
int32_t schResetJobForRetry(SSchJob *pJob, SSchTask *pTask, int32_t rspCode, bool *inRetry) {
while (true) {
if (pTask->seriousId < atomic_load_64(&pJob->seriousId)) {
SCH_TASK_DLOG("task sId %" PRId64 " is smaller than current job sId %" PRId64, pTask->seriousId, pJob->seriousId);
return TSDB_CODE_SCH_IGNORE_ERROR;
}
int8_t origInRetry = atomic_val_compare_exchange_8(&pJob->inRetry, 0, 1);
if (0 != origInRetry) {
SCH_JOB_DLOG("job already in retry, origInRetry: %d", pJob->inRetry);
taosUsleep(1);
continue;
}
if (pTask->seriousId < atomic_load_64(&pJob->seriousId)) {
SCH_TASK_DLOG("task sId %" PRId64 " is smaller than current job sId %" PRId64, pTask->seriousId, pJob->seriousId);
return TSDB_CODE_SCH_IGNORE_ERROR;
}
break;
}
*inRetry = true;
@ -1040,6 +1055,8 @@ int32_t schResetJobForRetry(SSchJob *pJob, int32_t rspCode, bool *inRetry) {
SCH_RESET_JOB_LEVEL_IDX(pJob);
atomic_add_fetch_64(&pJob->seriousId, 1);
SCH_JOB_DLOG("update job sId to %" PRId64, pJob->seriousId);
return TSDB_CODE_SUCCESS;
}
@ -1055,7 +1072,7 @@ int32_t schHandleJobRetry(SSchJob *pJob, SSchTask *pTask, SDataBuf *pMsg, int32_
SCH_TASK_DLOG("start to redirect all job tasks cause of error: %s", tstrerror(rspCode));
SCH_ERR_JRET(schResetJobForRetry(pJob, rspCode, &inRetry));
SCH_ERR_JRET(schResetJobForRetry(pJob, pTask, rspCode, &inRetry));
SCH_ERR_JRET(schLaunchJob(pJob));

View File

@ -1143,7 +1143,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
case TDMT_VND_DELETE: {
SVDeleteReq req = {0};
req.header.vgId = addr->nodeId;
req.sId = pJob->seriousId;
req.sId = pTask->seriousId;
req.queryId = pJob->queryId;
req.clientId = pTask->clientId;
req.taskId = pTask->taskId;
@ -1177,7 +1177,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
SSubQueryMsg qMsg;
qMsg.header.vgId = addr->nodeId;
qMsg.header.contLen = 0;
qMsg.sId = pJob->seriousId;
qMsg.sId = pTask->seriousId;
qMsg.queryId = pJob->queryId;
qMsg.clientId = pTask->clientId;
qMsg.taskId = pTask->taskId;
@ -1233,7 +1233,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
case TDMT_SCH_MERGE_FETCH: {
SResFetchReq req = {0};
req.header.vgId = addr->nodeId;
req.sId = pJob->seriousId;
req.sId = pTask->seriousId;
req.queryId = pJob->queryId;
req.clientId = pTask->clientId;
req.taskId = pTask->taskId;
@ -1261,7 +1261,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
STaskDropReq qMsg;
qMsg.header.vgId = addr->nodeId;
qMsg.header.contLen = 0;
qMsg.sId = pJob->seriousId;
qMsg.sId = pTask->seriousId;
qMsg.queryId = pJob->queryId;
qMsg.clientId = pTask->clientId;
qMsg.taskId = pTask->taskId;
@ -1319,7 +1319,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
STaskNotifyReq qMsg;
qMsg.header.vgId = addr->nodeId;
qMsg.header.contLen = 0;
qMsg.sId = pJob->seriousId;
qMsg.sId = pTask->seriousId;
qMsg.queryId = pJob->queryId;
qMsg.clientId = pTask->clientId;
qMsg.taskId = pTask->taskId;

View File

@ -308,7 +308,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
.type = QUERY_NODE_DOWNSTREAM_SOURCE,
.clientId = pTask->clientId,
.taskId = pTask->taskId,
.sId = pJob->seriousId,
.sId = pTask->seriousId,
.execId = pTask->execId,
.addr = pTask->succeedAddr,
.fetchMsgType = SCH_FETCH_TYPE(pTask),
@ -1342,6 +1342,7 @@ int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
for (int32_t i = 0; i < level->taskNum; ++i) {
SSchTask *pTask = taosArrayGet(level->subTasks, i);
pTask->seriousId = pJob->seriousId;
SCH_ERR_RET(schDelayLaunchTask(pJob, pTask));
}

View File

@ -249,6 +249,10 @@ int32_t taosInitLog(const char *logName, int32_t maxFiles, bool tsc) {
return 0;
}
void taosSetNoNewFile() {
tsLogObj.openInProgress = 1;
}
static void taosStopLog() {
if (tsLogObj.logHandle) {
tsLogObj.logHandle->stop = 1;

View File

@ -391,6 +391,7 @@ void mptInitLogFile() {
if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum, false) < 0) {
MPT_PRINTF("failed to open log file in directory:%s\n", tsLogDir);
}
taosSetNoNewFile();
}
static bool mptJobMemSizeCompFn(void* l, void* r, void* param) {
@ -443,6 +444,8 @@ void mptInit() {
ASSERT_TRUE(NULL != mptCtx.pSrcString);
memset(mptCtx.pSrcString, 'P', mptCtrl.maxSingleAllocSize - 1);
mptCtx.pSrcString[mptCtrl.maxSingleAllocSize - 1] = 0;
void* p = taosMemMalloc(1048576UL * 20000);
}
void mptDestroySession(uint64_t qId, int64_t tId, int32_t eId, int32_t taskIdx, SMPTestJobCtx* pJobCtx, void* session) {
@ -793,7 +796,7 @@ void mptInitPool(void) {
assert(0 == taosMemoryPoolInit(mptRetireJobsCb, mptRetireJobCb));
}
void mptWriteMem(void* pStart, int32_t size) {
void mptWriteMem(void* pStart, int64_t size) {
char* pEnd = (char*)pStart + size - 1;
char* p = (char*)pStart;
while (p <= pEnd) {
@ -1357,7 +1360,7 @@ void* mptRunThreadFunc(void* param) {
}
void* mptNonPoolThreadFunc(void* param) {
int64_t targetSize = MPT_NON_POOL_ALLOC_UNIT;
int64_t targetSize = MPT_NON_POOL_ALLOC_UNIT * 3;
int64_t allocSize = 0;
int32_t loopTimes = 0;
@ -1379,7 +1382,7 @@ void* mptNonPoolThreadFunc(void* param) {
taosMsleep(100);
if (loopTimes >= 5000) {
if (loopTimes >= 4000) {
targetSize += MPT_NON_POOL_ALLOC_UNIT;
loopTimes = 0;
}