diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index a7b682f49d..affa265d53 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -165,6 +165,7 @@ const SSchema* tGetTbnameColumnSchema(); bool tIsValidSchema(struct SSchema* pSchema, int32_t numOfCols, int32_t numOfTags); int32_t queryCreateTableMetaFromMsg(STableMetaRsp* msg, bool isSuperTable, STableMeta** pMeta); +char *jobTaskStatusStr(int32_t status); SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* name); @@ -181,9 +182,9 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t #define NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_TABLE_RECREATED) #define NEED_CLIENT_HANDLE_ERROR(_code) (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code)) -#define NEED_SCHEDULER_RETRY_ERROR(_code) ((_code) == TSDB_CODE_RPC_REDIRECT) +#define NEED_SCHEDULER_RETRY_ERROR(_code) ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL) -#define REQUEST_MAX_RETRY_NUM 3 +#define REQUEST_MAX_TRY_TIMES 5 #define qFatal(...) \ do { \ diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 21ae132990..67001856e6 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -319,7 +319,7 @@ SRequestObj* execQuery(STscObj* pTscObj, const char* sql, int sqlLen) { int32_t retryNum = 0; int32_t code = 0; - while (retryNum++ < REQUEST_MAX_RETRY_NUM) { + while (retryNum++ < REQUEST_MAX_TRY_TIMES) { pRequest = execQueryImpl(pTscObj, sql, sqlLen); if (TSDB_CODE_SUCCESS == pRequest->code || !NEED_CLIENT_HANDLE_ERROR(pRequest->code)) { break; diff --git a/source/libs/qcom/src/queryUtil.c b/source/libs/qcom/src/queryUtil.c index 543a908226..44f1c454c9 100644 --- a/source/libs/qcom/src/queryUtil.c +++ b/source/libs/qcom/src/queryUtil.c @@ -162,6 +162,32 @@ int32_t asyncSendMsgToServer(void* pTransporter, SEpSet* epSet, int64_t* pTransp return TSDB_CODE_SUCCESS; } +char *jobTaskStatusStr(int32_t status) { + switch (status) { + case JOB_TASK_STATUS_NULL: + return "NULL"; + case JOB_TASK_STATUS_NOT_START: + return "NOT_START"; + case JOB_TASK_STATUS_EXECUTING: + return "EXECUTING"; + case JOB_TASK_STATUS_PARTIAL_SUCCEED: + return "PARTIAL_SUCCEED"; + case JOB_TASK_STATUS_SUCCEED: + return "SUCCEED"; + case JOB_TASK_STATUS_FAILED: + return "FAILED"; + case JOB_TASK_STATUS_CANCELLING: + return "CANCELLING"; + case JOB_TASK_STATUS_CANCELLED: + return "CANCELLED"; + case JOB_TASK_STATUS_DROPPING: + return "DROPPING"; + default: + break; + } + + return "UNKNOWN"; +} SSchema createSchema(uint8_t type, int32_t bytes, int32_t colId, const char* name) { SSchema s = {0}; diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index b5b8726a4c..4bd71f0fbd 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -145,7 +145,6 @@ typedef struct SQWorkerMgmt { void *timer; tmr_h hbTimer; SRWLatch schLock; - //SRWLatch ctxLock; SHashObj *schHash; //key: schedulerId, value: SQWSchStatus SHashObj *ctxHash; //key: queryId+taskId, value: SQWTaskCtx void *nodeObj; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 42890ab38a..468c744dba 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -11,6 +11,27 @@ SQWDebug gQWDebug = {0}; +char *qwPhaseStr(int32_t phase) { + switch (phase) { + case QW_PHASE_PRE_QUERY: + return "PRE_QUERY"; + case QW_PHASE_POST_QUERY: + return "POST_QUERY"; + case QW_PHASE_PRE_FETCH: + return "PRE_FETCH"; + case QW_PHASE_POST_FETCH: + return "POST_FETCH"; + case QW_PHASE_PRE_CQUERY: + return "PRE_CQUERY"; + case QW_PHASE_POST_CQUERY: + return "POST_CQUERY"; + default: + break; + } + + return "UNKNOWN"; +} + int32_t qwValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus) { int32_t code = 0; @@ -72,7 +93,7 @@ int32_t qwValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus) { break; default: - QW_TASK_ELOG("invalid task status:%d", oriStatus); + QW_TASK_ELOG("invalid task origStatus:%s", jobTaskStatusStr(oriStatus)); return TSDB_CODE_QRY_APP_ERROR; } @@ -80,7 +101,7 @@ int32_t qwValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus) { _return: - QW_TASK_ELOG("invalid task status update from %d to %d", oriStatus, newStatus); + QW_TASK_ELOG("invalid task status update from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus)); QW_RET(code); } @@ -97,7 +118,7 @@ int32_t qwSetTaskStatus(QW_FPARAMS_DEF, SQWTaskStatus *task, int8_t status) { continue; } - QW_TASK_DLOG("task status updated from %d to %d", origStatus, status); + QW_TASK_DLOG("task status updated from %s to %s", jobTaskStatusStr(origStatus), jobTaskStatusStr(status)); break; } @@ -206,16 +227,18 @@ int32_t qwAddTaskStatusImpl(QW_FPARAMS_DEF, SQWSchStatus *sch, int32_t rwType, i if (rwType && task) { QW_RET(qwAcquireTaskStatus(QW_FPARAMS(), rwType, sch, task)); } else { - QW_TASK_ELOG("task status already exist, id:%s", id); + QW_TASK_ELOG("task status already exist, newStatus:%s", jobTaskStatusStr(status)); QW_ERR_RET(TSDB_CODE_QRY_TASK_ALREADY_EXIST); } } else { - QW_TASK_ELOG("taosHashPut to tasksHash failed, code:%x", code); + QW_TASK_ELOG("taosHashPut to tasksHash failed, error:%x - %s", code, tstrerror(code)); QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } } QW_UNLOCK(QW_WRITE, &sch->tasksLock); + QW_TASK_DLOG("task status added, newStatus:%s", jobTaskStatusStr(status)); + if (rwType && task) { QW_ERR_RET(qwAcquireTaskStatus(QW_FPARAMS(), rwType, sch, task)); } @@ -252,10 +275,8 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { char id[sizeof(qId) + sizeof(tId)] = {0}; QW_SET_QTID(id, qId, tId); - //QW_LOCK(rwType, &mgmt->ctxLock); *ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id)); if (NULL == (*ctx)) { - //QW_UNLOCK(rwType, &mgmt->ctxLock); QW_TASK_DLOG_E("task ctx not exist, may be dropped"); QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); } @@ -276,32 +297,28 @@ int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { return TSDB_CODE_SUCCESS; } -int32_t qwAddTaskCtxImpl(QW_FPARAMS_DEF, bool acquire, int32_t status, SQWTaskCtx **ctx) { +int32_t qwAddTaskCtxImpl(QW_FPARAMS_DEF, bool acquire, SQWTaskCtx **ctx) { char id[sizeof(qId) + sizeof(tId)] = {0}; QW_SET_QTID(id, qId, tId); SQWTaskCtx nctx = {0}; - //QW_LOCK(QW_WRITE, &mgmt->ctxLock); int32_t code = taosHashPut(mgmt->ctxHash, id, sizeof(id), &nctx, sizeof(SQWTaskCtx)); if (0 != code) { - //QW_UNLOCK(QW_WRITE, &mgmt->ctxLock); - if (HASH_NODE_EXIST(code)) { if (acquire && ctx) { QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx)); } else if (ctx) { QW_RET(qwGetTaskCtx(QW_FPARAMS(), ctx)); } else { - QW_TASK_ELOG("task ctx already exist, id:%s", id); + QW_TASK_ELOG_E("task ctx already exist"); QW_ERR_RET(TSDB_CODE_QRY_TASK_ALREADY_EXIST); } } else { - QW_TASK_ELOG("taosHashPut to ctxHash failed, code:%x", code); + QW_TASK_ELOG("taosHashPut to ctxHash failed, error:%x", code); QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } } - //QW_UNLOCK(QW_WRITE, &mgmt->ctxLock); if (acquire && ctx) { QW_RET(qwAcquireTaskCtx(QW_FPARAMS(), ctx)); @@ -313,17 +330,15 @@ int32_t qwAddTaskCtxImpl(QW_FPARAMS_DEF, bool acquire, int32_t status, SQWTaskCt } int32_t qwAddTaskCtx(QW_FPARAMS_DEF) { - QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), false, 0, NULL)); + QW_RET(qwAddTaskCtxImpl(QW_FPARAMS(), false, NULL)); } - - int32_t qwAddAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { - return qwAddTaskCtxImpl(QW_FPARAMS(), true, 0, ctx); + return qwAddTaskCtxImpl(QW_FPARAMS(), true, ctx); } int32_t qwAddGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { - return qwAddTaskCtxImpl(QW_FPARAMS(), false, 0, ctx); + return qwAddTaskCtxImpl(QW_FPARAMS(), false, ctx); } @@ -638,7 +653,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void } if (DS_BUF_EMPTY == pOutput->bufStatus && pOutput->queryEnd) { - QW_SCH_TASK_DLOG("task all fetched, status:%d", JOB_TASK_STATUS_SUCCEED); + QW_TASK_DLOG_E("task all data fetched, done"); QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED)); } @@ -653,9 +668,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu void *dropConnection = NULL; void *cancelConnection = NULL; - QW_SCH_TASK_DLOG("start to handle event at phase %d", phase); - - output->needStop = false; + QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase)); if (QW_PHASE_PRE_QUERY == phase) { QW_ERR_JRET(qwAddAcquireTaskCtx(QW_FPARAMS(), &ctx)); @@ -671,7 +684,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu atomic_store_8(&ctx->phase, phase); if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL) || QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { - QW_TASK_ELOG("task already cancelled/dropped at wrong phase, phase:%d", phase); + QW_TASK_ELOG("task already cancelled/dropped at wrong phase %s", qwPhaseStr(phase)); output->needStop = true; output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR; @@ -705,7 +718,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu } if (ctx->rspCode) { - QW_TASK_ELOG("task already failed at wrong phase, code:%x, phase:%d", ctx->rspCode, phase); + QW_TASK_ELOG("task already failed at phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode, tstrerror(ctx->rspCode)); output->needStop = true; output->rspCode = ctx->rspCode; QW_ERR_JRET(output->rspCode); @@ -718,46 +731,46 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu } case QW_PHASE_PRE_FETCH: { if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { - QW_TASK_WLOG("task already dropped, phase:%d", phase); + QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase)); output->needStop = true; output->rspCode = TSDB_CODE_QRY_TASK_DROPPED; QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); } if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) { - QW_TASK_WLOG("task already cancelled, phase:%d", phase); + QW_TASK_WLOG("task already cancelled, phase:%s", qwPhaseStr(phase)); output->needStop = true; output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED; QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); } if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { - QW_TASK_ELOG("drop event at wrong phase, phase:%d", phase); + QW_TASK_ELOG("drop event at wrong phase %s", qwPhaseStr(phase)); output->needStop = true; output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR; QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); } else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) { - QW_TASK_ELOG("cancel event at wrong phase, phase:%d", phase); + QW_TASK_ELOG("cancel event at wrong phase %s", qwPhaseStr(phase)); output->needStop = true; output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR; QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); } if (ctx->rspCode) { - QW_TASK_ELOG("task already failed, code:%x, phase:%d", ctx->rspCode, phase); + QW_TASK_ELOG("task already failed at phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode, tstrerror(ctx->rspCode)); output->needStop = true; output->rspCode = ctx->rspCode; QW_ERR_JRET(output->rspCode); } if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { - QW_TASK_WLOG("last fetch not finished, phase:%d", phase); + QW_TASK_WLOG("last fetch not finished, phase:%s", qwPhaseStr(phase)); output->needStop = true; output->rspCode = TSDB_CODE_QRY_DUPLICATTED_OPERATION; QW_ERR_JRET(TSDB_CODE_QRY_DUPLICATTED_OPERATION); } if (!QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_READY)) { - QW_TASK_ELOG("query rsp are not ready, phase:%d", phase); + QW_TASK_ELOG("query rsp are not ready, phase:%s", qwPhaseStr(phase)); output->needStop = true; output->rspCode = TSDB_CODE_QRY_TASK_MSG_ERROR; QW_ERR_JRET(TSDB_CODE_QRY_TASK_MSG_ERROR); @@ -768,14 +781,14 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu atomic_store_8(&ctx->phase, phase); if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) { - QW_TASK_WLOG("task already cancelled, phase:%d", phase); + QW_TASK_WLOG("task already cancelled, phase:%s", qwPhaseStr(phase)); output->needStop = true; output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED; QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); } if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { - QW_TASK_WLOG("task already dropped, phase:%d", phase); + QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase)); output->needStop = true; output->rspCode = TSDB_CODE_QRY_TASK_DROPPED; QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); @@ -810,7 +823,7 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu if (ctx->rspCode) { - QW_TASK_ELOG("task already failed, code:%x, phase:%d", ctx->rspCode, phase); + QW_TASK_ELOG("task already failed at phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode, tstrerror(ctx->rspCode)); output->needStop = true; output->rspCode = ctx->rspCode; QW_ERR_JRET(output->rspCode); @@ -842,15 +855,15 @@ _return: if (dropConnection) { qwBuildAndSendDropRsp(dropConnection, output->rspCode); - QW_TASK_DLOG("drop msg rsped, code:%x", output->rspCode); + QW_TASK_DLOG("drop msg rsped, code:%x - %s", output->rspCode, tstrerror(output->rspCode)); } if (cancelConnection) { qwBuildAndSendCancelRsp(cancelConnection, output->rspCode); - QW_TASK_DLOG("cancel msg rsped, code:%x", output->rspCode); + QW_TASK_DLOG("cancel msg rsped, code:%x - %s", output->rspCode, tstrerror(output->rspCode)); } - QW_SCH_TASK_DLOG("end to handle event at phase %d", phase); + QW_TASK_DLOG("end to handle event at phase %s", qwPhaseStr(phase)); QW_RET(code); } @@ -865,7 +878,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp void *dropConnection = NULL; void *cancelConnection = NULL; - QW_SCH_TASK_DLOG("start to handle event at phase %d", phase); + QW_TASK_DLOG("start to handle event at phase %s", qwPhaseStr(phase)); output->needStop = false; @@ -875,14 +888,14 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp locked = true; if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { - QW_TASK_WLOG("task already dropped, phase:%d", phase); + QW_TASK_WLOG("task already dropped, phase:%s", qwPhaseStr(phase)); output->needStop = true; output->rspCode = TSDB_CODE_QRY_TASK_DROPPED; QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); } if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) { - QW_TASK_WLOG("task already cancelled, phase:%d", phase); + QW_TASK_WLOG("task already cancelled, phase:%s", qwPhaseStr(phase)); output->needStop = true; output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED; QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); @@ -931,7 +944,7 @@ int32_t qwHandlePostPhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inp } if (ctx->rspCode) { - QW_TASK_ELOG("task failed, code:%x, phase:%d", ctx->rspCode, phase); + QW_TASK_ELOG("task already failed at phase %s, error:%x - %s", qwPhaseStr(phase), ctx->rspCode, tstrerror(ctx->rspCode)); output->needStop = true; output->rspCode = ctx->rspCode; QW_ERR_JRET(output->rspCode); @@ -968,20 +981,20 @@ _return: if (readyConnection) { qwBuildAndSendReadyRsp(readyConnection, output->rspCode); - QW_TASK_DLOG("ready msg rsped, code:%x", output->rspCode); + QW_TASK_DLOG("ready msg rsped, code:%x - %s", output->rspCode, tstrerror(output->rspCode)); } if (dropConnection) { qwBuildAndSendDropRsp(dropConnection, output->rspCode); - QW_TASK_DLOG("drop msg rsped, code:%x", output->rspCode); + QW_TASK_DLOG("drop msg rsped, code:%x - %s", output->rspCode, tstrerror(output->rspCode)); } if (cancelConnection) { qwBuildAndSendCancelRsp(cancelConnection, output->rspCode); - QW_TASK_DLOG("cancel msg rsped, code:%x", output->rspCode); + QW_TASK_DLOG("cancel msg rsped, code:%x - %s", output->rspCode, tstrerror(output->rspCode)); } - QW_SCH_TASK_DLOG("end to handle event at phase %d", phase); + QW_TASK_DLOG("end to handle event at phase %s", qwPhaseStr(phase)); QW_RET(code); } diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index c67b3ef4a9..8db661f197 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -300,7 +300,7 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { QW_SCH_TASK_DLOG("processQuery start, node:%p, sql:%s", node, sql); tfree(sql); - QW_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType)); + QW_ERR_RET(qwProcessQuery(QW_FPARAMS(), &qwMsg, msg->taskType)); QW_SCH_TASK_DLOG("processQuery end, node:%p", node); diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 2ccc18c176..1c40f255cf 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -28,8 +28,7 @@ extern "C" { #define SCHEDULE_DEFAULT_MAX_JOB_NUM 1000 #define SCHEDULE_DEFAULT_MAX_TASK_NUM 1000 -#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 20 // unit is TSDB_TABLE_NUM_UNIT - +#define SCHEDULE_DEFAULT_MAX_NODE_TABLE_NUM 200 // unit is TSDB_TABLE_NUM_UNIT #define SCH_MAX_CANDIDATE_EP_NUM TSDB_MAX_REPLICA @@ -113,6 +112,7 @@ typedef struct SSchTask { int32_t msgLen; // msg length int8_t status; // task status int32_t lastMsgType; // last sent msg type + int32_t tryTimes; // task already tried times SQueryNodeAddr succeedAddr; // task executed success node address int8_t candidateIdx; // current try condidation index SArray *candidateAddrs; // condidate node addresses, element is SQueryNodeAddr @@ -176,9 +176,12 @@ extern SSchedulerMgmt schMgmt; #define SCH_SET_TASK_STATUS(task, st) atomic_store_8(&(task)->status, st) #define SCH_GET_TASK_STATUS(task) atomic_load_8(&(task)->status) +#define SCH_GET_TASK_STATUS_STR(task) jobTaskStatusStr(SCH_GET_TASK_STATUS(task)) + #define SCH_SET_JOB_STATUS(job, st) atomic_store_8(&(job)->status, st) #define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status) +#define SCH_GET_JOB_STATUS_STR(job) jobTaskStatusStr(SCH_GET_JOB_STATUS(job)) #define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true #define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl) @@ -193,6 +196,7 @@ extern SSchedulerMgmt schMgmt; #define SCH_IS_LEVEL_UNFINISHED(_level) ((_level)->taskLaunchedNum < (_level)->taskNum) #define SCH_GET_CUR_EP(_addr) (&(_addr)->epSet.eps[(_addr)->epSet.inUse]) #define SCH_SWITCH_EPSET(_addr) ((_addr)->epSet.inUse = ((_addr)->epSet.inUse + 1) % (_addr)->epSet.numOfEps) +#define SCH_TASK_NUM_OF_EPS(_addr) ((_addr)->epSet.numOfEps) #define SCH_JOB_ELOG(param, ...) qError("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__) #define SCH_JOB_DLOG(param, ...) qDebug("QID:0x%" PRIx64 " " param, pJob->queryId, __VA_ARGS__) diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index e292513f8e..189124ef23 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -105,7 +105,7 @@ static FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) { int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t msgType) { int32_t lastMsgType = SCH_GET_TASK_LASTMSG_TYPE(pTask); - + int32_t taskStatus = SCH_GET_TASK_STATUS(pTask); switch (msgType) { case TDMT_VND_CREATE_TABLE_RSP: case TDMT_VND_SUBMIT_RSP: @@ -118,14 +118,14 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } - if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING && SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_PARTIAL_SUCCEED) { - SCH_TASK_ELOG("rsp msg conflicted with task status, status:%d, rspType:%s", SCH_GET_TASK_STATUS(pTask), TMSG_INFO(msgType)); + if (taskStatus != JOB_TASK_STATUS_EXECUTING && taskStatus != JOB_TASK_STATUS_PARTIAL_SUCCEED) { + SCH_TASK_ELOG("rsp msg conflicted with task status, status:%s, rspType:%s", jobTaskStatusStr(taskStatus), TMSG_INFO(msgType)); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } break; default: - SCH_TASK_ELOG("unknown rsp msg, type:%s, status:%d", TMSG_INFO(msgType), SCH_GET_TASK_STATUS(pTask)); + SCH_TASK_ELOG("unknown rsp msg, type:%s, status:%s", TMSG_INFO(msgType), jobTaskStatusStr(taskStatus)); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -193,7 +193,7 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { break; default: - SCH_JOB_ELOG("invalid job status:%d", oriStatus); + SCH_JOB_ELOG("invalid job status:%s", jobTaskStatusStr(oriStatus)); SCH_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); } @@ -201,7 +201,7 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { continue; } - SCH_JOB_DLOG("job status updated from %d to %d", oriStatus, newStatus); + SCH_JOB_DLOG("job status updated from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus)); break; } @@ -210,7 +210,7 @@ int32_t schCheckAndUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { _return: - SCH_JOB_ELOG("invalid job status update, from %d to %d", oriStatus, newStatus); + SCH_JOB_ELOG("invalid job status update, from %s to %s", jobTaskStatusStr(oriStatus), jobTaskStatusStr(newStatus)); SCH_ERR_RET(code); } @@ -503,7 +503,7 @@ int32_t schPushTaskToExecList(SSchJob *pJob, SSchTask *pTask) { int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) { if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) { - SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%d", SCH_GET_TASK_STATUS(pTask)); + SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); } else { SCH_TASK_DLOG("task removed from execTask list, numOfTasks:%d", taosHashGetSize(pJob->execTasks)); } @@ -513,7 +513,7 @@ int32_t schMoveTaskToSuccList(SSchJob *pJob, SSchTask *pTask, bool *moved) { if (HASH_NODE_EXIST(code)) { *moved = true; - SCH_TASK_ELOG("task already in succTask list, status:%d", SCH_GET_TASK_STATUS(pTask)); + SCH_TASK_ELOG("task already in succTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } @@ -532,7 +532,7 @@ int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) { *moved = false; if (0 != taosHashRemove(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId))) { - SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%d", SCH_GET_TASK_STATUS(pTask)); + SCH_TASK_WLOG("remove task from execTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); } int32_t code = taosHashPut(pJob->failTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES); @@ -540,7 +540,7 @@ int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) { if (HASH_NODE_EXIST(code)) { *moved = true; - SCH_TASK_WLOG("task already in failTask list, status:%d", SCH_GET_TASK_STATUS(pTask)); + SCH_TASK_WLOG("task already in failTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } @@ -558,7 +558,7 @@ int32_t schMoveTaskToFailList(SSchJob *pJob, SSchTask *pTask, bool *moved) { int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) { if (0 != taosHashRemove(pJob->succTasks, &pTask->taskId, sizeof(pTask->taskId))) { - SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%d", SCH_GET_TASK_STATUS(pTask)); + SCH_TASK_WLOG("remove task from succTask list failed, may not exist, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); } int32_t code = taosHashPut(pJob->execTasks, &pTask->taskId, sizeof(pTask->taskId), &pTask, POINTER_BYTES); @@ -566,7 +566,7 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) { if (HASH_NODE_EXIST(code)) { *moved = true; - SCH_TASK_ELOG("task already in execTask list, status:%d", SCH_GET_TASK_STATUS(pTask)); + SCH_TASK_ELOG("task already in execTask list, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } @@ -583,26 +583,48 @@ int32_t schMoveTaskToExecList(SSchJob *pJob, SSchTask *pTask, bool *moved) { int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry) { - // TODO set retry or not based on task type/errCode/retry times/job status/available eps... + int8_t status = 0; + ++pTask->tryTimes; + + if (schJobNeedToStop(pJob, &status)) { + *needRetry = false; + SCH_TASK_DLOG("task no more retry cause of job status, job status:%s", jobTaskStatusStr(status)); + return TSDB_CODE_SUCCESS; + } - *needRetry = false; - - return TSDB_CODE_SUCCESS; + if (pTask->tryTimes >= REQUEST_MAX_TRY_TIMES) { + *needRetry = false; + SCH_TASK_DLOG("task no more retry since reach max try times, tryTimes:%d", pTask->tryTimes); + return TSDB_CODE_SUCCESS; + } + + if (!NEED_SCHEDULER_RETRY_ERROR(errCode)) { + *needRetry = false; + SCH_TASK_DLOG("task no more retry cause of errCode, errCode:%x - %s", errCode, tstrerror(errCode)); + return TSDB_CODE_SUCCESS; + } //TODO CHECK epList/condidateList - if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) { - + if (SCH_IS_DATA_SRC_TASK(pTask)) { + if (pTask->tryTimes >= SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)) { + *needRetry = false; + SCH_TASK_DLOG("task no more retry since all ep tried, tryTimes:%d, epNum:%d", pTask->tryTimes, SCH_TASK_NUM_OF_EPS(&pTask->plan->execNode)); + return TSDB_CODE_SUCCESS; + } } else { int32_t candidateNum = taosArrayGetSize(pTask->candidateAddrs); if ((pTask->candidateIdx + 1) >= candidateNum) { + *needRetry = false; + SCH_TASK_DLOG("task no more retry since all candiates tried, candidateIdx:%d, candidateNum:%d", pTask->candidateIdx, candidateNum); return TSDB_CODE_SUCCESS; } - - ++pTask->candidateIdx; } - + *needRetry = true; + SCH_TASK_DLOG("task need the %dth retry, errCode:%x - %s", pTask->tryTimes, errCode, tstrerror(errCode)); + + return TSDB_CODE_SUCCESS; } int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { @@ -613,7 +635,7 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) { SCH_ERR_RET(schLaunchTasksInFlowCtrlList(pJob, pTask)); } - if (SCH_IS_DATA_SRC_QRY_TASK(pTask)) { + if (SCH_IS_DATA_SRC_TASK(pTask)) { SCH_SWITCH_EPSET(&pTask->plan->execNode); } else { ++pTask->candidateIdx; @@ -762,7 +784,7 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) int8_t status = 0; if (schJobNeedToStop(pJob, &status)) { - SCH_TASK_DLOG("task failed not processed cause of job status, job status:%d", status); + SCH_TASK_DLOG("task failed not processed cause of job status, job status:%s", jobTaskStatusStr(status)); SCH_RET(atomic_load_32(&pJob->errCode)); } @@ -781,7 +803,7 @@ int32_t schProcessOnTaskFailure(SSchJob *pJob, SSchTask *pTask, int32_t errCode) if (SCH_GET_TASK_STATUS(pTask) == JOB_TASK_STATUS_EXECUTING) { SCH_ERR_JRET(schMoveTaskToFailList(pJob, pTask, &moved)); } else { - SCH_TASK_ELOG("task not in executing list, status:%d", SCH_GET_TASK_STATUS(pTask)); + SCH_TASK_ELOG("task not in executing list, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); SCH_ERR_JRET(TSDB_CODE_SCH_STATUS_ERROR); } @@ -816,7 +838,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { bool moved = false; int32_t code = 0; - SCH_TASK_DLOG("taskOnSuccess, status:%d", SCH_GET_TASK_STATUS(pTask)); + SCH_TASK_DLOG("taskOnSuccess, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); SCH_ERR_JRET(schMoveTaskToSuccList(pJob, pTask, &moved)); @@ -925,7 +947,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch int8_t status = 0; if (schJobNeedToStop(pJob, &status)) { - SCH_TASK_ELOG("rsp not processed cause of job status, job status:%d", status); + SCH_TASK_ELOG("rsp not processed cause of job status, job status:%s, rspCode:0x%x", jobTaskStatusStr(status), rspCode); SCH_RET(atomic_load_32(&pJob->errCode)); } @@ -1034,7 +1056,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch break; } default: - SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%d", msgType, SCH_GET_TASK_STATUS(pTask)); + SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%s", msgType, SCH_GET_TASK_STATUS_STR(pTask)); SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -1414,7 +1436,7 @@ int32_t schLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) { atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1); if (schJobNeedToStop(pJob, &status)) { - SCH_TASK_DLOG("no need to launch task cause of job status, job status:%d", status); + SCH_TASK_DLOG("no need to launch task cause of job status, job status:%s", jobTaskStatusStr(status)); SCH_RET(atomic_load_32(&pJob->errCode)); } @@ -1496,14 +1518,14 @@ int32_t schLaunchJob(SSchJob *pJob) { void schDropTaskOnExecutedNode(SSchJob *pJob, SSchTask *pTask) { if (NULL == pTask->execAddrs) { - SCH_TASK_DLOG("no exec address, status:%d", SCH_GET_TASK_STATUS(pTask)); + SCH_TASK_DLOG("no exec address, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); return; } int32_t size = (int32_t)taosArrayGetSize(pTask->execAddrs); if (size <= 0) { - SCH_TASK_DLOG("task has no exec address, no need to drop it, status:%d", SCH_GET_TASK_STATUS(pTask)); + SCH_TASK_DLOG("task has no exec address, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask)); return; } @@ -1652,11 +1674,11 @@ static int32_t schExecJobImpl(void *transport, SArray *pNodeList, SQueryPlan* pD *job = pJob->refId; if (syncSchedule) { - SCH_JOB_DLOG("will wait for rsp now, job status:%d", SCH_GET_JOB_STATUS(pJob)); + SCH_JOB_DLOG("will wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); tsem_wait(&pJob->rspSem); } - SCH_JOB_DLOG("job exec done, job status:%d", SCH_GET_JOB_STATUS(pJob)); + SCH_JOB_DLOG("job exec done, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); schReleaseJob(pJob->refId); @@ -1874,13 +1896,13 @@ int32_t schedulerFetchRows(int64_t job, void** pData) { int8_t status = SCH_GET_JOB_STATUS(pJob); if (status == JOB_TASK_STATUS_DROPPING) { - SCH_JOB_ELOG("job is dropping, status:%d", status); + SCH_JOB_ELOG("job is dropping, status:%s", jobTaskStatusStr(status)); schReleaseJob(job); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } if (!SCH_JOB_NEED_FETCH(pJob)) { - SCH_JOB_ELOG("no need to fetch data, status:%d", SCH_GET_JOB_STATUS(pJob)); + SCH_JOB_ELOG("no need to fetch data, status:%s", SCH_GET_JOB_STATUS_STR(pJob)); schReleaseJob(job); SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } @@ -1892,10 +1914,10 @@ int32_t schedulerFetchRows(int64_t job, void** pData) { } if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) { - SCH_JOB_ELOG("job failed or dropping, status:%d", status); + SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status)); SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); } else if (status == JOB_TASK_STATUS_SUCCEED) { - SCH_JOB_DLOG("job already succeed, status:%d", status); + SCH_JOB_DLOG("job already succeed, status:%s", jobTaskStatusStr(status)); goto _return; } else if (status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { SCH_ERR_JRET(schFetchFromRemote(pJob)); @@ -1906,7 +1928,7 @@ int32_t schedulerFetchRows(int64_t job, void** pData) { status = SCH_GET_JOB_STATUS(pJob); if (JOB_TASK_STATUS_FAILED == status || JOB_TASK_STATUS_DROPPING == status) { - SCH_JOB_ELOG("job failed or dropping, status:%d", status); + SCH_JOB_ELOG("job failed or dropping, status:%s", jobTaskStatusStr(status)); SCH_ERR_JRET(atomic_load_32(&pJob->errCode)); }