From 2fbf082f7d7f9a00a7b72047d256a39021001c14 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Fri, 18 Nov 2022 19:55:15 +0800 Subject: [PATCH 1/2] enh: support client redirect processing --- include/libs/qcom/query.h | 11 +- include/util/taoserror.h | 1 + source/libs/scheduler/inc/schInt.h | 22 +++- source/libs/scheduler/src/schTask.c | 148 ++++++++++++++++++++++---- source/libs/scheduler/src/schUtil.c | 21 ++++ source/libs/scheduler/src/scheduler.c | 6 ++ source/util/src/terror.c | 1 + 7 files changed, 185 insertions(+), 25 deletions(-) diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 96ccef7164..f51aa88485 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -259,9 +259,15 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t #define NEED_CLIENT_HANDLE_ERROR(_code) \ (NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \ NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code)) + +#define SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR) +#define SYNC_SELF_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR) +#define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) (false) // used later + #define NEED_REDIRECT_ERROR(_code) \ ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \ - (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || (_code) == TSDB_CODE_SYN_NOT_LEADER || \ + (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || \ + SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) || \ (_code) == TSDB_CODE_APP_NOT_READY || (_code) == TSDB_CODE_RPC_BROKEN_LINK) #define NEED_CLIENT_RM_TBLMETA_REQ(_type) \ @@ -270,7 +276,8 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t #define NEED_SCHEDULER_REDIRECT_ERROR(_code) \ ((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || \ - (_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_APP_NOT_READY) + SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || \ + SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) || (_code) == TSDB_CODE_APP_NOT_READY) #define REQUEST_TOTAL_EXEC_TIMES 2 diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 636decc60b..26f22f398c 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -92,6 +92,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0129) #define TSDB_CODE_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x012A) #define TSDB_CODE_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012B) +#define TSDB_CODE_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x012C) //client #define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200) diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 6884824ba9..7cd8d608c2 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -145,7 +145,8 @@ typedef struct SSchedulerMgmt { bool exit; int32_t jobRef; int32_t jobNum; - SSchStat stat; + SSchStat stat; + void *timer; SRWLatch hbLock; SHashObj *hbConnections; void *queryMgmt; @@ -202,12 +203,30 @@ typedef struct SSchTaskProfile { int64_t endTs; } SSchTaskProfile; +typedef struct SSchRedirectCtx { + int32_t periodMs; + bool inRedirect; + int32_t totalTimes; + int32_t roundTotal; + int32_t roundTimes; // retry times in current round + int64_t startTs; +} SSchRedirectCtx; + +typedef struct SSchTimerParam { + int64_t rId; + uint64_t queryId; + uint64_t taskId; +} SSchTimerParam; + typedef struct SSchTask { uint64_t taskId; // task id SRWLatch lock; // task reentrant lock int32_t maxExecTimes; // task max exec times int32_t maxRetryTimes; // task max retry times int32_t retryTimes; // task retry times + int32_t delayExecMs; // task execution delay time + tmr_h delayTimer; // task delay execution timer + SSchRedirectCtx redirectCtx; // task redirect context bool waitRetry; // wait for retry int32_t execId; // task current execute index SSchLevel *level; // level @@ -529,6 +548,7 @@ int32_t schJobFetchRows(SSchJob *pJob); int32_t schJobFetchRowsA(SSchJob *pJob); int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId); int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList); +char *schDumpEpSet(SEpSet *pEpSet); char *schGetOpStr(SCH_OP_TYPE type); int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync); int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq); diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 7e5b3faedb..9262a9257c 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -340,6 +340,67 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } +int32_t schChkUpdateRedirectCtx(SSchTask *pTask, SEpSet *pEpSet) { + SSchRedirectCtx *pCtx = &pTask->redirectCtx; + if (!pCtx->inRedirect) { + pCtx->inRedirect = true; + pCtx->periodMs = tsRedirectPeriod; + pCtx->startTs = taosGetTimestampMs(); + + if (SCH_IS_DATA_BIND_TASK(pTask)) { + if (pEpSet) { + pCtx->roundTotal = pEpSet->numOfEps; + } else { + SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0); + pCtx->roundTotal = pAddr->epSet.numOfEps; + } + } else { + pCtx->roundTotal = 1; + } + + goto _return; + } + + pCtx->totalTimes++; + + int64_t nowTs = taosGetTimestampMs(); + if ((nowTs - pCtx->startTs) > tsMaxRetryWaitTime) { + SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d", + nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes); + SCH_ERR_RET(TSDB_CODE_TIMEOUT_ERROR); + } + + if (SCH_IS_DATA_BIND_TASK(pTask) && pEpSet) { + pCtx->roundTotal = pEpSet->numOfEps; + pCtx->roundTimes = 0; + + pTask->delayExecMs = 0; + + goto _return; + } + + pCtx->roundTimes++; + + if (pCtx->roundTimes >= pCtx->roundTotal) { + pCtx->periodMs *= tsRedirectFactor; + if (pCtx->periodMs > tsRedirectMaxPeriod) { + pCtx->periodMs = tsRedirectMaxPeriod; + } + + pTask->delayExecMs = pCtx->periodMs; + + goto _return; + } + + pTask->delayExecMs = 0; + +_return: + + SCH_TASK_DLOG("task start %d/%d/%d redirect retry, delayExec:%d", pCtx->roundTimes, pCtx->roundTotal, pCtx->totalTimes, pTask->delayExecMs); + + return TSDB_CODE_SUCCESS; +} + int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) { int32_t code = 0; @@ -349,14 +410,10 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 pTask->retryTimes = 0; } - if (((pTask->execId + 1) >= pTask->maxExecTimes) || ((pTask->retryTimes + 1) > pTask->maxRetryTimes)) { - SCH_TASK_DLOG("task no more retry since reach max times %d:%d, execId %d", pTask->maxRetryTimes, - pTask->maxExecTimes, pTask->execId); - schHandleJobFailure(pJob, rspCode); - return TSDB_CODE_SUCCESS; - } + SCH_ERR_JRET(schChkUpdateRedirectCtx(pTask, pData ? pData->pEpSet : NULL)); pTask->waitRetry = true; + schDropTaskOnExecNode(pJob, pTask); taosHashClear(pTask->execNodes); schRemoveTaskFromExecList(pJob, pTask); @@ -368,8 +425,12 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr)); if (SCH_IS_DATA_BIND_TASK(pTask)) { - if (pData) { + if (pData && pData->pEpSet) { SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet)); + } else if (SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(rspCode)) { + SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); + SCH_SWITCH_EPSET(addr); + SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps); } if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { @@ -380,7 +441,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT); - SCH_ERR_JRET(schLaunchTask(pJob, pTask)); + SCH_ERR_JRET(schDelayLaunchTask(pJob, pTask)); return TSDB_CODE_SUCCESS; } @@ -428,28 +489,24 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC); } - if (SCH_IS_DATA_BIND_TASK(pTask)) { + if (SYNC_OTHER_LEADER_REDIRECT_ERROR(rspCode)) { if (NULL == pData->pEpSet) { - SCH_TASK_ELOG("no epset updated while got error %s", tstrerror(rspCode)); - code = rspCode; + SCH_TASK_ELOG("epset updating excepted, error:%s", tstrerror(rspCode)); + code = TSDB_CODE_INVALID_MSG; goto _return; } } code = schDoTaskRedirect(pJob, pTask, pData, rspCode); - taosMemoryFree(pData->pData); - taosMemoryFree(pData->pEpSet); - pData->pData = NULL; - pData->pEpSet = NULL; + taosMemoryFreeClear(pData->pData); + taosMemoryFreeClear(pData->pEpSet); SCH_RET(code); _return: - taosMemoryFree(pData->pData); - taosMemoryFree(pData->pEpSet); - pData->pData = NULL; - pData->pEpSet = NULL; + taosMemoryFreeClear(pData->pData); + taosMemoryFreeClear(pData->pEpSet); SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } @@ -715,10 +772,10 @@ int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSe SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0); - SEp *pOld = &pAddr->epSet.eps[pAddr->epSet.inUse]; - SEp *pNew = &pEpSet->eps[pEpSet->inUse]; + char *origEpset = schDumpEpSet(&pAddr->epSet); + char *newEpset = schDumpEpSet(pEpSet); - SCH_TASK_DLOG("update task ep from %s:%d to %s:%d", pOld->fqdn, pOld->port, pNew->fqdn, pNew->port); + SCH_TASK_DLOG("update task target node %d epset from %s to %s", pAddr->nodeId, origEpset, newEpset); memcpy(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet)); @@ -1078,6 +1135,53 @@ _return: SCH_RET(schProcessOnTaskFailure(pJob, pTask, code)); } +void schHandleTimerEvent(void *param, void *tmrId) { + SSchTimerParam *pTimerParam = (SSchTimerParam *)param; + SSchTask *pTask = NULL; + SSchJob *pJob = NULL; + int32_t code = 0; + + SCH_ERR_RET(schProcessOnCbBegin(&pJob, &pTask, pTimerParam->queryId, pTimerParam->rId, pTimerParam->taskId)); + + SCH_ERR_JRET(schLaunchTask(pJob, pTask)); + + return; + +_return: + + schHandleJobFailure(pJob, code); +} + +int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) { + if (pTask->delayExecMs > 0) { + SSchTimerParam *param = taosMemoryMalloc(sizeof(SSchTimerParam)); + if (NULL == param) { + SCH_TASK_ELOG("taosMemoryMalloc %d failed", sizeof(SSchTimerParam)); + QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + param->rId = pJob->refId; + param->queryId = pJob->queryId; + param->taskId = pTask->taskId; + + if (NULL == pTask->delayTimer) { + pTask->delayTimer = taosTmrStart(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer); + if (NULL == pTask->delayTimer) { + SCH_TASK_ELOG("start delay timer failed"); + QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + return TSDB_CODE_SUCCESS; + } + + taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void*)param, schMgmt.timer, &pTask->delayTimer); + + return TSDB_CODE_SUCCESS; + } + + SCH_RET(schLaunchTask(pJob, pTask)); +} + int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) { SCH_ERR_RET(schChkJobNeedFlowCtrl(pJob, level)); diff --git a/source/libs/scheduler/src/schUtil.c b/source/libs/scheduler/src/schUtil.c index 1f1288fcfd..a5a15cd1c6 100644 --- a/source/libs/scheduler/src/schUtil.c +++ b/source/libs/scheduler/src/schUtil.c @@ -36,6 +36,27 @@ FORCE_INLINE int32_t schReleaseJob(int64_t refId) { return taosReleaseRef(schMgmt.jobRef, refId); } +char *schDumpEpSet(SEpSet *pEpSet) { + if (NULL == pEpSet) { + return NULL; + } + + int32_t maxSize = 1024; + char *str = taosMemoryMalloc(maxSize); + if (NULL == str) { + return NULL; + } + + int32_t n = 0; + n += snprintf(str + n, maxSize - n, "numOfEps:%d, inUse:%d eps:", pEpSet->numOfEps, pEpSet->inUse); + for (int32_t i = 0; i < pEpSet->numOfEps; ++i) { + SEp *pEp = &pEpSet->eps[i]; + n += snprintf(str + n, maxSize - n, "[%s:%d]", pEp->fqdn, pEp->port); + } + + return str; +} + char *schGetOpStr(SCH_OP_TYPE type) { switch (type) { case SCH_OP_NULL: diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index c4b45649a4..91c3b5d7a1 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -48,6 +48,12 @@ int32_t schedulerInit() { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + schMgmt.timer = taosTmrInit(0, 0, 0, "scheduler"); + if (NULL == schMgmt.timer) { + qError("init timer failed, error:%s", tstrerror(terrno)); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + if (taosGetSystemUUID((char *)&schMgmt.sId, sizeof(schMgmt.sId))) { qError("generate schdulerId failed, errno:%d", errno); SCH_ERR_RET(TSDB_CODE_QRY_SYS_ERROR); diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 43602a607a..ef1cd6556c 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -95,6 +95,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MSG_DECODE_ERROR, "Msg decode error") TAOS_DEFINE_ERROR(TSDB_CODE_NO_AVAIL_DISK, "No available disk") TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found") TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISKSPACE, "Out of disk space") +TAOS_DEFINE_ERROR(TSDB_CODE_TIMEOUT_ERROR, "Operation timeout") //client TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation") From 072a73ec3d1d2b3547bc1ba8b2adec3d553978e4 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 21 Nov 2022 11:35:52 +0800 Subject: [PATCH 2/2] enh: support max retry wait time configuration --- include/common/tglobal.h | 4 +++ source/common/src/tglobal.c | 8 +++++ source/libs/scheduler/inc/schInt.h | 2 ++ source/libs/scheduler/src/schRemote.c | 11 ++++-- source/libs/scheduler/src/schTask.c | 49 ++++++++++++++++----------- 5 files changed, 51 insertions(+), 23 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 2076906f70..d23b1f519a 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -90,6 +90,10 @@ extern int32_t tsQueryNodeChunkSize; extern bool tsQueryUseNodeAllocator; extern bool tsKeepColumnName; extern bool tsEnableQueryHb; +extern int32_t tsRedirectPeriod; +extern int32_t tsRedirectFactor; +extern int32_t tsRedirectMaxPeriod; +extern int32_t tsMaxRetryWaitTime; // client extern int32_t tsMinSlidingTime; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 27dcbd5be3..cb82ad300b 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -87,6 +87,10 @@ bool tsQueryPlannerTrace = false; int32_t tsQueryNodeChunkSize = 32 * 1024; bool tsQueryUseNodeAllocator = true; bool tsKeepColumnName = false; +int32_t tsRedirectPeriod = 100; +int32_t tsRedirectFactor = 5; +int32_t tsRedirectMaxPeriod = 10000; +int32_t tsMaxRetryWaitTime = 60000; /* * denote if the server needs to compress response message at the application layer to client, including query rsp, @@ -301,6 +305,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddInt32(pCfg, "maxMemUsedByInsert", tsMaxMemUsedByInsert, 1, INT32_MAX, true) != 0) return -1; if (cfgAddInt32(pCfg, "rpcRetryLimit", tsRpcRetryLimit, 1, 100000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "rpcRetryInterval", tsRpcRetryInterval, 1, 100000, 0) != 0) return -1; + if (cfgAddInt32(pCfg, "maxRetryWaitTime", tsMaxRetryWaitTime, 0, 86400000, 0) != 0) return -1; tsNumOfTaskQueueThreads = tsNumOfCores / 2; tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4); @@ -645,6 +650,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tsRpcRetryLimit = cfgGetItem(pCfg, "rpcRetryLimit")->i32; tsRpcRetryInterval = cfgGetItem(pCfg, "rpcRetryInterval")->i32; + tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32; return 0; } @@ -860,6 +866,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32; } else if (strcasecmp("maxMemUsedByInsert", name) == 0) { tsMaxMemUsedByInsert = cfgGetItem(pCfg, "maxMemUsedByInsert")->i32; + } else if (strcasecmp("maxRetryWaitTime", name) == 0) { + tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32; } break; } diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 7cd8d608c2..83b58a77d2 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -27,6 +27,7 @@ extern "C" { #include "tarray.h" #include "thash.h" #include "trpc.h" +#include "ttimer.h" enum { SCH_READ = 1, @@ -507,6 +508,7 @@ extern SSchedulerMgmt schMgmt; void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask); void schCleanClusterHb(void *pTrans); int32_t schLaunchTask(SSchJob *job, SSchTask *task); +int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask); int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType); SSchJob *schAcquireJob(int64_t refId); int32_t schReleaseJob(int64_t refId); diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index a6a2a6c301..4ebe07cf58 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -887,9 +887,14 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, msg, msgSize, msgType, trans, isHb, &pMsgSendInfo)); SCH_ERR_JRET(schUpdateSendTargetInfo(pMsgSendInfo, addr, pTask)); - qDebug("start to send %s msg to node[%d,%s,%d], pTrans:%p, pHandle:%p", TMSG_INFO(msgType), addr->nodeId, - epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, trans->pTrans, trans->pHandle); - + if (pJob && pTask) { + SCH_TASK_DLOG("start to send %s msg to node[%d,%s,%d], pTrans:%p, pHandle:%p", TMSG_INFO(msgType), addr->nodeId, + epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, trans->pTrans, trans->pHandle); + } else { + qDebug("start to send %s msg to node[%d,%s,%d], pTrans:%p, pHandle:%p", TMSG_INFO(msgType), addr->nodeId, + epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, trans->pTrans, trans->pHandle); + } + if (pTask) { pTask->lastMsgType = msgType; } diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 9262a9257c..0b235e63e9 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -340,7 +340,7 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) { return TSDB_CODE_SUCCESS; } -int32_t schChkUpdateRedirectCtx(SSchTask *pTask, SEpSet *pEpSet) { +int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) { SSchRedirectCtx *pCtx = &pTask->redirectCtx; if (!pCtx->inRedirect) { pCtx->inRedirect = true; @@ -362,13 +362,6 @@ int32_t schChkUpdateRedirectCtx(SSchTask *pTask, SEpSet *pEpSet) { } pCtx->totalTimes++; - - int64_t nowTs = taosGetTimestampMs(); - if ((nowTs - pCtx->startTs) > tsMaxRetryWaitTime) { - SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d", - nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes); - SCH_ERR_RET(TSDB_CODE_TIMEOUT_ERROR); - } if (SCH_IS_DATA_BIND_TASK(pTask) && pEpSet) { pCtx->roundTotal = pEpSet->numOfEps; @@ -382,12 +375,21 @@ int32_t schChkUpdateRedirectCtx(SSchTask *pTask, SEpSet *pEpSet) { pCtx->roundTimes++; if (pCtx->roundTimes >= pCtx->roundTotal) { + int64_t nowTs = taosGetTimestampMs(); + int64_t lastTime = nowTs - pCtx->startTs; + if (lastTime > tsMaxRetryWaitTime) { + SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d", + nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes); + SCH_ERR_RET(TSDB_CODE_TIMEOUT_ERROR); + } + pCtx->periodMs *= tsRedirectFactor; if (pCtx->periodMs > tsRedirectMaxPeriod) { pCtx->periodMs = tsRedirectMaxPeriod; } - pTask->delayExecMs = pCtx->periodMs; + int64_t leftTime = tsMaxRetryWaitTime - lastTime; + pTask->delayExecMs = leftTime < pCtx->periodMs ? leftTime : pCtx->periodMs; goto _return; } @@ -410,7 +412,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 pTask->retryTimes = 0; } - SCH_ERR_JRET(schChkUpdateRedirectCtx(pTask, pData ? pData->pEpSet : NULL)); + SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL)); pTask->waitRetry = true; @@ -431,6 +433,10 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32 SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); SCH_SWITCH_EPSET(addr); SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps); + } else { + SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx); + SEp *pEp = &addr->epSet.eps[addr->epSet.inUse]; + SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps, pEp->fqdn, pEp->port); } if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) { @@ -1141,15 +1147,13 @@ void schHandleTimerEvent(void *param, void *tmrId) { SSchJob *pJob = NULL; int32_t code = 0; - SCH_ERR_RET(schProcessOnCbBegin(&pJob, &pTask, pTimerParam->queryId, pTimerParam->rId, pTimerParam->taskId)); + if (schProcessOnCbBegin(&pJob, &pTask, pTimerParam->queryId, pTimerParam->rId, pTimerParam->taskId)) { + return; + } - SCH_ERR_JRET(schLaunchTask(pJob, pTask)); + code = schLaunchTask(pJob, pTask); - return; - -_return: - - schHandleJobFailure(pJob, code); + schProcessOnCbEnd(pJob, pTask, code); } int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) { @@ -1157,7 +1161,7 @@ int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) { SSchTimerParam *param = taosMemoryMalloc(sizeof(SSchTimerParam)); if (NULL == param) { SCH_TASK_ELOG("taosMemoryMalloc %d failed", sizeof(SSchTimerParam)); - QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } param->rId = pJob->refId; @@ -1167,8 +1171,8 @@ int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) { if (NULL == pTask->delayTimer) { pTask->delayTimer = taosTmrStart(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer); if (NULL == pTask->delayTimer) { - SCH_TASK_ELOG("start delay timer failed"); - QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); + SCH_TASK_ELOG("start delay timer failed, handle:%p", schMgmt.timer); + SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } return TSDB_CODE_SUCCESS; @@ -1203,7 +1207,12 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) { while (pIter) { SSchTask *pTask = *(SSchTask **)pIter; + SCH_LOCK_TASK(pTask); + if (pTask->delayTimer) { + taosTmrStopA(&pTask->delayTimer); + } schDropTaskOnExecNode(pJob, pTask); + SCH_UNLOCK_TASK(pTask); pIter = taosHashIterate(list, pIter); }