enh: support max retry wait time configuration
This commit is contained in:
parent
2fbf082f7d
commit
072a73ec3d
|
@ -90,6 +90,10 @@ extern int32_t tsQueryNodeChunkSize;
|
||||||
extern bool tsQueryUseNodeAllocator;
|
extern bool tsQueryUseNodeAllocator;
|
||||||
extern bool tsKeepColumnName;
|
extern bool tsKeepColumnName;
|
||||||
extern bool tsEnableQueryHb;
|
extern bool tsEnableQueryHb;
|
||||||
|
extern int32_t tsRedirectPeriod;
|
||||||
|
extern int32_t tsRedirectFactor;
|
||||||
|
extern int32_t tsRedirectMaxPeriod;
|
||||||
|
extern int32_t tsMaxRetryWaitTime;
|
||||||
|
|
||||||
// client
|
// client
|
||||||
extern int32_t tsMinSlidingTime;
|
extern int32_t tsMinSlidingTime;
|
||||||
|
|
|
@ -87,6 +87,10 @@ bool tsQueryPlannerTrace = false;
|
||||||
int32_t tsQueryNodeChunkSize = 32 * 1024;
|
int32_t tsQueryNodeChunkSize = 32 * 1024;
|
||||||
bool tsQueryUseNodeAllocator = true;
|
bool tsQueryUseNodeAllocator = true;
|
||||||
bool tsKeepColumnName = false;
|
bool tsKeepColumnName = false;
|
||||||
|
int32_t tsRedirectPeriod = 100;
|
||||||
|
int32_t tsRedirectFactor = 5;
|
||||||
|
int32_t tsRedirectMaxPeriod = 10000;
|
||||||
|
int32_t tsMaxRetryWaitTime = 60000;
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* denote if the server needs to compress response message at the application layer to client, including query rsp,
|
* denote if the server needs to compress response message at the application layer to client, including query rsp,
|
||||||
|
@ -301,6 +305,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
||||||
if (cfgAddInt32(pCfg, "maxMemUsedByInsert", tsMaxMemUsedByInsert, 1, INT32_MAX, true) != 0) return -1;
|
if (cfgAddInt32(pCfg, "maxMemUsedByInsert", tsMaxMemUsedByInsert, 1, INT32_MAX, true) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "rpcRetryLimit", tsRpcRetryLimit, 1, 100000, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "rpcRetryLimit", tsRpcRetryLimit, 1, 100000, 0) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "rpcRetryInterval", tsRpcRetryInterval, 1, 100000, 0) != 0) return -1;
|
if (cfgAddInt32(pCfg, "rpcRetryInterval", tsRpcRetryInterval, 1, 100000, 0) != 0) return -1;
|
||||||
|
if (cfgAddInt32(pCfg, "maxRetryWaitTime", tsMaxRetryWaitTime, 0, 86400000, 0) != 0) return -1;
|
||||||
|
|
||||||
tsNumOfTaskQueueThreads = tsNumOfCores / 2;
|
tsNumOfTaskQueueThreads = tsNumOfCores / 2;
|
||||||
tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4);
|
tsNumOfTaskQueueThreads = TMAX(tsNumOfTaskQueueThreads, 4);
|
||||||
|
@ -645,6 +650,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
|
||||||
|
|
||||||
tsRpcRetryLimit = cfgGetItem(pCfg, "rpcRetryLimit")->i32;
|
tsRpcRetryLimit = cfgGetItem(pCfg, "rpcRetryLimit")->i32;
|
||||||
tsRpcRetryInterval = cfgGetItem(pCfg, "rpcRetryInterval")->i32;
|
tsRpcRetryInterval = cfgGetItem(pCfg, "rpcRetryInterval")->i32;
|
||||||
|
tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -860,6 +866,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
|
||||||
tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32;
|
tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32;
|
||||||
} else if (strcasecmp("maxMemUsedByInsert", name) == 0) {
|
} else if (strcasecmp("maxMemUsedByInsert", name) == 0) {
|
||||||
tsMaxMemUsedByInsert = cfgGetItem(pCfg, "maxMemUsedByInsert")->i32;
|
tsMaxMemUsedByInsert = cfgGetItem(pCfg, "maxMemUsedByInsert")->i32;
|
||||||
|
} else if (strcasecmp("maxRetryWaitTime", name) == 0) {
|
||||||
|
tsMaxRetryWaitTime = cfgGetItem(pCfg, "maxRetryWaitTime")->i32;
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ extern "C" {
|
||||||
#include "tarray.h"
|
#include "tarray.h"
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
|
#include "ttimer.h"
|
||||||
|
|
||||||
enum {
|
enum {
|
||||||
SCH_READ = 1,
|
SCH_READ = 1,
|
||||||
|
@ -507,6 +508,7 @@ extern SSchedulerMgmt schMgmt;
|
||||||
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask);
|
void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask);
|
||||||
void schCleanClusterHb(void *pTrans);
|
void schCleanClusterHb(void *pTrans);
|
||||||
int32_t schLaunchTask(SSchJob *job, SSchTask *task);
|
int32_t schLaunchTask(SSchJob *job, SSchTask *task);
|
||||||
|
int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask);
|
||||||
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType);
|
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType);
|
||||||
SSchJob *schAcquireJob(int64_t refId);
|
SSchJob *schAcquireJob(int64_t refId);
|
||||||
int32_t schReleaseJob(int64_t refId);
|
int32_t schReleaseJob(int64_t refId);
|
||||||
|
|
|
@ -887,9 +887,14 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery
|
||||||
SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, msg, msgSize, msgType, trans, isHb, &pMsgSendInfo));
|
SCH_ERR_JRET(schGenerateCallBackInfo(pJob, pTask, msg, msgSize, msgType, trans, isHb, &pMsgSendInfo));
|
||||||
SCH_ERR_JRET(schUpdateSendTargetInfo(pMsgSendInfo, addr, pTask));
|
SCH_ERR_JRET(schUpdateSendTargetInfo(pMsgSendInfo, addr, pTask));
|
||||||
|
|
||||||
qDebug("start to send %s msg to node[%d,%s,%d], pTrans:%p, pHandle:%p", TMSG_INFO(msgType), addr->nodeId,
|
if (pJob && pTask) {
|
||||||
epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, trans->pTrans, trans->pHandle);
|
SCH_TASK_DLOG("start to send %s msg to node[%d,%s,%d], pTrans:%p, pHandle:%p", TMSG_INFO(msgType), addr->nodeId,
|
||||||
|
epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, trans->pTrans, trans->pHandle);
|
||||||
|
} else {
|
||||||
|
qDebug("start to send %s msg to node[%d,%s,%d], pTrans:%p, pHandle:%p", TMSG_INFO(msgType), addr->nodeId,
|
||||||
|
epSet->eps[epSet->inUse].fqdn, epSet->eps[epSet->inUse].port, trans->pTrans, trans->pHandle);
|
||||||
|
}
|
||||||
|
|
||||||
if (pTask) {
|
if (pTask) {
|
||||||
pTask->lastMsgType = msgType;
|
pTask->lastMsgType = msgType;
|
||||||
}
|
}
|
||||||
|
|
|
@ -340,7 +340,7 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t schChkUpdateRedirectCtx(SSchTask *pTask, SEpSet *pEpSet) {
|
int32_t schChkUpdateRedirectCtx(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) {
|
||||||
SSchRedirectCtx *pCtx = &pTask->redirectCtx;
|
SSchRedirectCtx *pCtx = &pTask->redirectCtx;
|
||||||
if (!pCtx->inRedirect) {
|
if (!pCtx->inRedirect) {
|
||||||
pCtx->inRedirect = true;
|
pCtx->inRedirect = true;
|
||||||
|
@ -362,13 +362,6 @@ int32_t schChkUpdateRedirectCtx(SSchTask *pTask, SEpSet *pEpSet) {
|
||||||
}
|
}
|
||||||
|
|
||||||
pCtx->totalTimes++;
|
pCtx->totalTimes++;
|
||||||
|
|
||||||
int64_t nowTs = taosGetTimestampMs();
|
|
||||||
if ((nowTs - pCtx->startTs) > tsMaxRetryWaitTime) {
|
|
||||||
SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d",
|
|
||||||
nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes);
|
|
||||||
SCH_ERR_RET(TSDB_CODE_TIMEOUT_ERROR);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (SCH_IS_DATA_BIND_TASK(pTask) && pEpSet) {
|
if (SCH_IS_DATA_BIND_TASK(pTask) && pEpSet) {
|
||||||
pCtx->roundTotal = pEpSet->numOfEps;
|
pCtx->roundTotal = pEpSet->numOfEps;
|
||||||
|
@ -382,12 +375,21 @@ int32_t schChkUpdateRedirectCtx(SSchTask *pTask, SEpSet *pEpSet) {
|
||||||
pCtx->roundTimes++;
|
pCtx->roundTimes++;
|
||||||
|
|
||||||
if (pCtx->roundTimes >= pCtx->roundTotal) {
|
if (pCtx->roundTimes >= pCtx->roundTotal) {
|
||||||
|
int64_t nowTs = taosGetTimestampMs();
|
||||||
|
int64_t lastTime = nowTs - pCtx->startTs;
|
||||||
|
if (lastTime > tsMaxRetryWaitTime) {
|
||||||
|
SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d",
|
||||||
|
nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes);
|
||||||
|
SCH_ERR_RET(TSDB_CODE_TIMEOUT_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
pCtx->periodMs *= tsRedirectFactor;
|
pCtx->periodMs *= tsRedirectFactor;
|
||||||
if (pCtx->periodMs > tsRedirectMaxPeriod) {
|
if (pCtx->periodMs > tsRedirectMaxPeriod) {
|
||||||
pCtx->periodMs = tsRedirectMaxPeriod;
|
pCtx->periodMs = tsRedirectMaxPeriod;
|
||||||
}
|
}
|
||||||
|
|
||||||
pTask->delayExecMs = pCtx->periodMs;
|
int64_t leftTime = tsMaxRetryWaitTime - lastTime;
|
||||||
|
pTask->delayExecMs = leftTime < pCtx->periodMs ? leftTime : pCtx->periodMs;
|
||||||
|
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
|
@ -410,7 +412,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
|
||||||
pTask->retryTimes = 0;
|
pTask->retryTimes = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_ERR_JRET(schChkUpdateRedirectCtx(pTask, pData ? pData->pEpSet : NULL));
|
SCH_ERR_JRET(schChkUpdateRedirectCtx(pJob, pTask, pData ? pData->pEpSet : NULL));
|
||||||
|
|
||||||
pTask->waitRetry = true;
|
pTask->waitRetry = true;
|
||||||
|
|
||||||
|
@ -431,6 +433,10 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
|
||||||
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
||||||
SCH_SWITCH_EPSET(addr);
|
SCH_SWITCH_EPSET(addr);
|
||||||
SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps);
|
SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps);
|
||||||
|
} else {
|
||||||
|
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
||||||
|
SEp *pEp = &addr->epSet.eps[addr->epSet.inUse];
|
||||||
|
SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps, pEp->fqdn, pEp->port);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
|
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
|
||||||
|
@ -1141,15 +1147,13 @@ void schHandleTimerEvent(void *param, void *tmrId) {
|
||||||
SSchJob *pJob = NULL;
|
SSchJob *pJob = NULL;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
SCH_ERR_RET(schProcessOnCbBegin(&pJob, &pTask, pTimerParam->queryId, pTimerParam->rId, pTimerParam->taskId));
|
if (schProcessOnCbBegin(&pJob, &pTask, pTimerParam->queryId, pTimerParam->rId, pTimerParam->taskId)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
SCH_ERR_JRET(schLaunchTask(pJob, pTask));
|
code = schLaunchTask(pJob, pTask);
|
||||||
|
|
||||||
return;
|
schProcessOnCbEnd(pJob, pTask, code);
|
||||||
|
|
||||||
_return:
|
|
||||||
|
|
||||||
schHandleJobFailure(pJob, code);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) {
|
int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) {
|
||||||
|
@ -1157,7 +1161,7 @@ int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) {
|
||||||
SSchTimerParam *param = taosMemoryMalloc(sizeof(SSchTimerParam));
|
SSchTimerParam *param = taosMemoryMalloc(sizeof(SSchTimerParam));
|
||||||
if (NULL == param) {
|
if (NULL == param) {
|
||||||
SCH_TASK_ELOG("taosMemoryMalloc %d failed", sizeof(SSchTimerParam));
|
SCH_TASK_ELOG("taosMemoryMalloc %d failed", sizeof(SSchTimerParam));
|
||||||
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
param->rId = pJob->refId;
|
param->rId = pJob->refId;
|
||||||
|
@ -1167,8 +1171,8 @@ int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) {
|
||||||
if (NULL == pTask->delayTimer) {
|
if (NULL == pTask->delayTimer) {
|
||||||
pTask->delayTimer = taosTmrStart(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer);
|
pTask->delayTimer = taosTmrStart(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer);
|
||||||
if (NULL == pTask->delayTimer) {
|
if (NULL == pTask->delayTimer) {
|
||||||
SCH_TASK_ELOG("start delay timer failed");
|
SCH_TASK_ELOG("start delay timer failed, handle:%p", schMgmt.timer);
|
||||||
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -1203,7 +1207,12 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
SSchTask *pTask = *(SSchTask **)pIter;
|
SSchTask *pTask = *(SSchTask **)pIter;
|
||||||
|
|
||||||
|
SCH_LOCK_TASK(pTask);
|
||||||
|
if (pTask->delayTimer) {
|
||||||
|
taosTmrStopA(&pTask->delayTimer);
|
||||||
|
}
|
||||||
schDropTaskOnExecNode(pJob, pTask);
|
schDropTaskOnExecNode(pJob, pTask);
|
||||||
|
SCH_UNLOCK_TASK(pTask);
|
||||||
|
|
||||||
pIter = taosHashIterate(list, pIter);
|
pIter = taosHashIterate(list, pIter);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue