enh: support client redirect processing
This commit is contained in:
parent
7e010dcee9
commit
2fbf082f7d
|
@ -259,9 +259,15 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
|
||||||
#define NEED_CLIENT_HANDLE_ERROR(_code) \
|
#define NEED_CLIENT_HANDLE_ERROR(_code) \
|
||||||
(NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \
|
(NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \
|
||||||
NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
|
NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
|
||||||
|
|
||||||
|
#define SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR)
|
||||||
|
#define SYNC_SELF_LEADER_REDIRECT_ERROR(_code) ((_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_SYN_INTERNAL_ERROR)
|
||||||
|
#define SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) (false) // used later
|
||||||
|
|
||||||
#define NEED_REDIRECT_ERROR(_code) \
|
#define NEED_REDIRECT_ERROR(_code) \
|
||||||
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \
|
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_RPC_NETWORK_UNAVAIL || \
|
||||||
(_code) == TSDB_CODE_NODE_NOT_DEPLOYED || (_code) == TSDB_CODE_SYN_NOT_LEADER || \
|
(_code) == TSDB_CODE_NODE_NOT_DEPLOYED || SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || \
|
||||||
|
SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) || \
|
||||||
(_code) == TSDB_CODE_APP_NOT_READY || (_code) == TSDB_CODE_RPC_BROKEN_LINK)
|
(_code) == TSDB_CODE_APP_NOT_READY || (_code) == TSDB_CODE_RPC_BROKEN_LINK)
|
||||||
|
|
||||||
#define NEED_CLIENT_RM_TBLMETA_REQ(_type) \
|
#define NEED_CLIENT_RM_TBLMETA_REQ(_type) \
|
||||||
|
@ -270,7 +276,8 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
|
||||||
|
|
||||||
#define NEED_SCHEDULER_REDIRECT_ERROR(_code) \
|
#define NEED_SCHEDULER_REDIRECT_ERROR(_code) \
|
||||||
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || \
|
((_code) == TSDB_CODE_RPC_REDIRECT || (_code) == TSDB_CODE_NODE_NOT_DEPLOYED || \
|
||||||
(_code) == TSDB_CODE_SYN_NOT_LEADER || (_code) == TSDB_CODE_APP_NOT_READY)
|
SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(_code) || SYNC_SELF_LEADER_REDIRECT_ERROR(_code) || \
|
||||||
|
SYNC_OTHER_LEADER_REDIRECT_ERROR(_code) || (_code) == TSDB_CODE_APP_NOT_READY)
|
||||||
|
|
||||||
#define REQUEST_TOTAL_EXEC_TIMES 2
|
#define REQUEST_TOTAL_EXEC_TIMES 2
|
||||||
|
|
||||||
|
|
|
@ -92,6 +92,7 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0129)
|
#define TSDB_CODE_NO_AVAIL_DISK TAOS_DEF_ERROR_CODE(0, 0x0129)
|
||||||
#define TSDB_CODE_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x012A)
|
#define TSDB_CODE_NOT_FOUND TAOS_DEF_ERROR_CODE(0, 0x012A)
|
||||||
#define TSDB_CODE_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012B)
|
#define TSDB_CODE_NO_DISKSPACE TAOS_DEF_ERROR_CODE(0, 0x012B)
|
||||||
|
#define TSDB_CODE_TIMEOUT_ERROR TAOS_DEF_ERROR_CODE(0, 0x012C)
|
||||||
|
|
||||||
//client
|
//client
|
||||||
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200)
|
#define TSDB_CODE_TSC_INVALID_OPERATION TAOS_DEF_ERROR_CODE(0, 0x0200)
|
||||||
|
|
|
@ -146,6 +146,7 @@ typedef struct SSchedulerMgmt {
|
||||||
int32_t jobRef;
|
int32_t jobRef;
|
||||||
int32_t jobNum;
|
int32_t jobNum;
|
||||||
SSchStat stat;
|
SSchStat stat;
|
||||||
|
void *timer;
|
||||||
SRWLatch hbLock;
|
SRWLatch hbLock;
|
||||||
SHashObj *hbConnections;
|
SHashObj *hbConnections;
|
||||||
void *queryMgmt;
|
void *queryMgmt;
|
||||||
|
@ -202,12 +203,30 @@ typedef struct SSchTaskProfile {
|
||||||
int64_t endTs;
|
int64_t endTs;
|
||||||
} SSchTaskProfile;
|
} SSchTaskProfile;
|
||||||
|
|
||||||
|
typedef struct SSchRedirectCtx {
|
||||||
|
int32_t periodMs;
|
||||||
|
bool inRedirect;
|
||||||
|
int32_t totalTimes;
|
||||||
|
int32_t roundTotal;
|
||||||
|
int32_t roundTimes; // retry times in current round
|
||||||
|
int64_t startTs;
|
||||||
|
} SSchRedirectCtx;
|
||||||
|
|
||||||
|
typedef struct SSchTimerParam {
|
||||||
|
int64_t rId;
|
||||||
|
uint64_t queryId;
|
||||||
|
uint64_t taskId;
|
||||||
|
} SSchTimerParam;
|
||||||
|
|
||||||
typedef struct SSchTask {
|
typedef struct SSchTask {
|
||||||
uint64_t taskId; // task id
|
uint64_t taskId; // task id
|
||||||
SRWLatch lock; // task reentrant lock
|
SRWLatch lock; // task reentrant lock
|
||||||
int32_t maxExecTimes; // task max exec times
|
int32_t maxExecTimes; // task max exec times
|
||||||
int32_t maxRetryTimes; // task max retry times
|
int32_t maxRetryTimes; // task max retry times
|
||||||
int32_t retryTimes; // task retry times
|
int32_t retryTimes; // task retry times
|
||||||
|
int32_t delayExecMs; // task execution delay time
|
||||||
|
tmr_h delayTimer; // task delay execution timer
|
||||||
|
SSchRedirectCtx redirectCtx; // task redirect context
|
||||||
bool waitRetry; // wait for retry
|
bool waitRetry; // wait for retry
|
||||||
int32_t execId; // task current execute index
|
int32_t execId; // task current execute index
|
||||||
SSchLevel *level; // level
|
SSchLevel *level; // level
|
||||||
|
@ -529,6 +548,7 @@ int32_t schJobFetchRows(SSchJob *pJob);
|
||||||
int32_t schJobFetchRowsA(SSchJob *pJob);
|
int32_t schJobFetchRowsA(SSchJob *pJob);
|
||||||
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId);
|
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId);
|
||||||
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList);
|
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList);
|
||||||
|
char *schDumpEpSet(SEpSet *pEpSet);
|
||||||
char *schGetOpStr(SCH_OP_TYPE type);
|
char *schGetOpStr(SCH_OP_TYPE type);
|
||||||
int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync);
|
int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync);
|
||||||
int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq);
|
int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq);
|
||||||
|
|
|
@ -340,6 +340,67 @@ int32_t schRescheduleTask(SSchJob *pJob, SSchTask *pTask) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t schChkUpdateRedirectCtx(SSchTask *pTask, SEpSet *pEpSet) {
|
||||||
|
SSchRedirectCtx *pCtx = &pTask->redirectCtx;
|
||||||
|
if (!pCtx->inRedirect) {
|
||||||
|
pCtx->inRedirect = true;
|
||||||
|
pCtx->periodMs = tsRedirectPeriod;
|
||||||
|
pCtx->startTs = taosGetTimestampMs();
|
||||||
|
|
||||||
|
if (SCH_IS_DATA_BIND_TASK(pTask)) {
|
||||||
|
if (pEpSet) {
|
||||||
|
pCtx->roundTotal = pEpSet->numOfEps;
|
||||||
|
} else {
|
||||||
|
SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
|
||||||
|
pCtx->roundTotal = pAddr->epSet.numOfEps;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
pCtx->roundTotal = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
pCtx->totalTimes++;
|
||||||
|
|
||||||
|
int64_t nowTs = taosGetTimestampMs();
|
||||||
|
if ((nowTs - pCtx->startTs) > tsMaxRetryWaitTime) {
|
||||||
|
SCH_TASK_DLOG("task no more redirect retry since timeout, now:%" PRId64 ", start:%" PRId64 ", max:%d, total:%d",
|
||||||
|
nowTs, pCtx->startTs, tsMaxRetryWaitTime, pCtx->totalTimes);
|
||||||
|
SCH_ERR_RET(TSDB_CODE_TIMEOUT_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (SCH_IS_DATA_BIND_TASK(pTask) && pEpSet) {
|
||||||
|
pCtx->roundTotal = pEpSet->numOfEps;
|
||||||
|
pCtx->roundTimes = 0;
|
||||||
|
|
||||||
|
pTask->delayExecMs = 0;
|
||||||
|
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
pCtx->roundTimes++;
|
||||||
|
|
||||||
|
if (pCtx->roundTimes >= pCtx->roundTotal) {
|
||||||
|
pCtx->periodMs *= tsRedirectFactor;
|
||||||
|
if (pCtx->periodMs > tsRedirectMaxPeriod) {
|
||||||
|
pCtx->periodMs = tsRedirectMaxPeriod;
|
||||||
|
}
|
||||||
|
|
||||||
|
pTask->delayExecMs = pCtx->periodMs;
|
||||||
|
|
||||||
|
goto _return;
|
||||||
|
}
|
||||||
|
|
||||||
|
pTask->delayExecMs = 0;
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
SCH_TASK_DLOG("task start %d/%d/%d redirect retry, delayExec:%d", pCtx->roundTimes, pCtx->roundTotal, pCtx->totalTimes, pTask->delayExecMs);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
|
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -349,14 +410,10 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
|
||||||
pTask->retryTimes = 0;
|
pTask->retryTimes = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (((pTask->execId + 1) >= pTask->maxExecTimes) || ((pTask->retryTimes + 1) > pTask->maxRetryTimes)) {
|
SCH_ERR_JRET(schChkUpdateRedirectCtx(pTask, pData ? pData->pEpSet : NULL));
|
||||||
SCH_TASK_DLOG("task no more retry since reach max times %d:%d, execId %d", pTask->maxRetryTimes,
|
|
||||||
pTask->maxExecTimes, pTask->execId);
|
|
||||||
schHandleJobFailure(pJob, rspCode);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
pTask->waitRetry = true;
|
pTask->waitRetry = true;
|
||||||
|
|
||||||
schDropTaskOnExecNode(pJob, pTask);
|
schDropTaskOnExecNode(pJob, pTask);
|
||||||
taosHashClear(pTask->execNodes);
|
taosHashClear(pTask->execNodes);
|
||||||
schRemoveTaskFromExecList(pJob, pTask);
|
schRemoveTaskFromExecList(pJob, pTask);
|
||||||
|
@ -368,8 +425,12 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
|
||||||
memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
|
memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
|
||||||
|
|
||||||
if (SCH_IS_DATA_BIND_TASK(pTask)) {
|
if (SCH_IS_DATA_BIND_TASK(pTask)) {
|
||||||
if (pData) {
|
if (pData && pData->pEpSet) {
|
||||||
SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
|
SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
|
||||||
|
} else if (SYNC_UNKNOWN_LEADER_REDIRECT_ERROR(rspCode)) {
|
||||||
|
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
||||||
|
SCH_SWITCH_EPSET(addr);
|
||||||
|
SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
|
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
|
||||||
|
@ -380,7 +441,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
|
||||||
|
|
||||||
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
|
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
|
||||||
|
|
||||||
SCH_ERR_JRET(schLaunchTask(pJob, pTask));
|
SCH_ERR_JRET(schDelayLaunchTask(pJob, pTask));
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -428,28 +489,24 @@ int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
|
||||||
schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC);
|
schUpdateJobStatus(pJob, JOB_TASK_STATUS_EXEC);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (SCH_IS_DATA_BIND_TASK(pTask)) {
|
if (SYNC_OTHER_LEADER_REDIRECT_ERROR(rspCode)) {
|
||||||
if (NULL == pData->pEpSet) {
|
if (NULL == pData->pEpSet) {
|
||||||
SCH_TASK_ELOG("no epset updated while got error %s", tstrerror(rspCode));
|
SCH_TASK_ELOG("epset updating excepted, error:%s", tstrerror(rspCode));
|
||||||
code = rspCode;
|
code = TSDB_CODE_INVALID_MSG;
|
||||||
goto _return;
|
goto _return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
code = schDoTaskRedirect(pJob, pTask, pData, rspCode);
|
code = schDoTaskRedirect(pJob, pTask, pData, rspCode);
|
||||||
taosMemoryFree(pData->pData);
|
taosMemoryFreeClear(pData->pData);
|
||||||
taosMemoryFree(pData->pEpSet);
|
taosMemoryFreeClear(pData->pEpSet);
|
||||||
pData->pData = NULL;
|
|
||||||
pData->pEpSet = NULL;
|
|
||||||
|
|
||||||
SCH_RET(code);
|
SCH_RET(code);
|
||||||
|
|
||||||
_return:
|
_return:
|
||||||
|
|
||||||
taosMemoryFree(pData->pData);
|
taosMemoryFreeClear(pData->pData);
|
||||||
taosMemoryFree(pData->pEpSet);
|
taosMemoryFreeClear(pData->pEpSet);
|
||||||
pData->pData = NULL;
|
|
||||||
pData->pEpSet = NULL;
|
|
||||||
|
|
||||||
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
|
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
|
||||||
}
|
}
|
||||||
|
@ -715,10 +772,10 @@ int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSe
|
||||||
|
|
||||||
SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
|
SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
|
||||||
|
|
||||||
SEp *pOld = &pAddr->epSet.eps[pAddr->epSet.inUse];
|
char *origEpset = schDumpEpSet(&pAddr->epSet);
|
||||||
SEp *pNew = &pEpSet->eps[pEpSet->inUse];
|
char *newEpset = schDumpEpSet(pEpSet);
|
||||||
|
|
||||||
SCH_TASK_DLOG("update task ep from %s:%d to %s:%d", pOld->fqdn, pOld->port, pNew->fqdn, pNew->port);
|
SCH_TASK_DLOG("update task target node %d epset from %s to %s", pAddr->nodeId, origEpset, newEpset);
|
||||||
|
|
||||||
memcpy(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet));
|
memcpy(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet));
|
||||||
|
|
||||||
|
@ -1078,6 +1135,53 @@ _return:
|
||||||
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
|
SCH_RET(schProcessOnTaskFailure(pJob, pTask, code));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void schHandleTimerEvent(void *param, void *tmrId) {
|
||||||
|
SSchTimerParam *pTimerParam = (SSchTimerParam *)param;
|
||||||
|
SSchTask *pTask = NULL;
|
||||||
|
SSchJob *pJob = NULL;
|
||||||
|
int32_t code = 0;
|
||||||
|
|
||||||
|
SCH_ERR_RET(schProcessOnCbBegin(&pJob, &pTask, pTimerParam->queryId, pTimerParam->rId, pTimerParam->taskId));
|
||||||
|
|
||||||
|
SCH_ERR_JRET(schLaunchTask(pJob, pTask));
|
||||||
|
|
||||||
|
return;
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
|
schHandleJobFailure(pJob, code);
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) {
|
||||||
|
if (pTask->delayExecMs > 0) {
|
||||||
|
SSchTimerParam *param = taosMemoryMalloc(sizeof(SSchTimerParam));
|
||||||
|
if (NULL == param) {
|
||||||
|
SCH_TASK_ELOG("taosMemoryMalloc %d failed", sizeof(SSchTimerParam));
|
||||||
|
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
param->rId = pJob->refId;
|
||||||
|
param->queryId = pJob->queryId;
|
||||||
|
param->taskId = pTask->taskId;
|
||||||
|
|
||||||
|
if (NULL == pTask->delayTimer) {
|
||||||
|
pTask->delayTimer = taosTmrStart(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer);
|
||||||
|
if (NULL == pTask->delayTimer) {
|
||||||
|
SCH_TASK_ELOG("start delay timer failed");
|
||||||
|
QW_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void*)param, schMgmt.timer, &pTask->delayTimer);
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SCH_RET(schLaunchTask(pJob, pTask));
|
||||||
|
}
|
||||||
|
|
||||||
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
|
int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level) {
|
||||||
SCH_ERR_RET(schChkJobNeedFlowCtrl(pJob, level));
|
SCH_ERR_RET(schChkJobNeedFlowCtrl(pJob, level));
|
||||||
|
|
||||||
|
|
|
@ -36,6 +36,27 @@ FORCE_INLINE int32_t schReleaseJob(int64_t refId) {
|
||||||
return taosReleaseRef(schMgmt.jobRef, refId);
|
return taosReleaseRef(schMgmt.jobRef, refId);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
char *schDumpEpSet(SEpSet *pEpSet) {
|
||||||
|
if (NULL == pEpSet) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t maxSize = 1024;
|
||||||
|
char *str = taosMemoryMalloc(maxSize);
|
||||||
|
if (NULL == str) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t n = 0;
|
||||||
|
n += snprintf(str + n, maxSize - n, "numOfEps:%d, inUse:%d eps:", pEpSet->numOfEps, pEpSet->inUse);
|
||||||
|
for (int32_t i = 0; i < pEpSet->numOfEps; ++i) {
|
||||||
|
SEp *pEp = &pEpSet->eps[i];
|
||||||
|
n += snprintf(str + n, maxSize - n, "[%s:%d]", pEp->fqdn, pEp->port);
|
||||||
|
}
|
||||||
|
|
||||||
|
return str;
|
||||||
|
}
|
||||||
|
|
||||||
char *schGetOpStr(SCH_OP_TYPE type) {
|
char *schGetOpStr(SCH_OP_TYPE type) {
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case SCH_OP_NULL:
|
case SCH_OP_NULL:
|
||||||
|
|
|
@ -48,6 +48,12 @@ int32_t schedulerInit() {
|
||||||
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
schMgmt.timer = taosTmrInit(0, 0, 0, "scheduler");
|
||||||
|
if (NULL == schMgmt.timer) {
|
||||||
|
qError("init timer failed, error:%s", tstrerror(terrno));
|
||||||
|
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
if (taosGetSystemUUID((char *)&schMgmt.sId, sizeof(schMgmt.sId))) {
|
if (taosGetSystemUUID((char *)&schMgmt.sId, sizeof(schMgmt.sId))) {
|
||||||
qError("generate schdulerId failed, errno:%d", errno);
|
qError("generate schdulerId failed, errno:%d", errno);
|
||||||
SCH_ERR_RET(TSDB_CODE_QRY_SYS_ERROR);
|
SCH_ERR_RET(TSDB_CODE_QRY_SYS_ERROR);
|
||||||
|
|
|
@ -95,6 +95,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MSG_DECODE_ERROR, "Msg decode error")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_NO_AVAIL_DISK, "No available disk")
|
TAOS_DEFINE_ERROR(TSDB_CODE_NO_AVAIL_DISK, "No available disk")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found")
|
TAOS_DEFINE_ERROR(TSDB_CODE_NOT_FOUND, "Not found")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISKSPACE, "Out of disk space")
|
TAOS_DEFINE_ERROR(TSDB_CODE_NO_DISKSPACE, "Out of disk space")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_TIMEOUT_ERROR, "Operation timeout")
|
||||||
|
|
||||||
//client
|
//client
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")
|
TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_OPERATION, "Invalid operation")
|
||||||
|
|
Loading…
Reference in New Issue