fix: scheduler return code
This commit is contained in:
parent
cfa9435a20
commit
352abe0b5c
|
@ -32,9 +32,9 @@ extern "C" {
|
||||||
#define TSWAP(a, b) \
|
#define TSWAP(a, b) \
|
||||||
do { \
|
do { \
|
||||||
char *__tmp = (char*)alloca(sizeof(a)); \
|
char *__tmp = (char*)alloca(sizeof(a)); \
|
||||||
memcpy(__tmp, &(a), sizeof(a)); \
|
(void)memcpy(__tmp, &(a), sizeof(a)); \
|
||||||
memcpy(&(a), &(b), sizeof(a)); \
|
(void)memcpy(&(a), &(b), sizeof(a)); \
|
||||||
memcpy(&(b), __tmp, sizeof(a)); \
|
(void)memcpy(&(b), __tmp, sizeof(a)); \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#ifdef WINDOWS
|
#ifdef WINDOWS
|
||||||
|
|
|
@ -54,6 +54,7 @@ void taosMemoryTrim(int32_t size);
|
||||||
void *taosMemoryMallocAlign(uint32_t alignment, int64_t size);
|
void *taosMemoryMallocAlign(uint32_t alignment, int64_t size);
|
||||||
|
|
||||||
#define TAOS_MEMSET(_s, _c, _n) ((void)memset(_s, _c, _n))
|
#define TAOS_MEMSET(_s, _c, _n) ((void)memset(_s, _c, _n))
|
||||||
|
#define TAOS_MEMCPY(_d, _s, _n) ((void)memcpy(_d, _s, _n))
|
||||||
|
|
||||||
#define taosMemoryFreeClear(ptr) \
|
#define taosMemoryFreeClear(ptr) \
|
||||||
do { \
|
do { \
|
||||||
|
|
|
@ -336,7 +336,7 @@ extern SSchedulerMgmt schMgmt;
|
||||||
((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && (!SCH_IS_INSERT_JOB(_job)) && \
|
((_job)->attr.localExec && SCH_IS_QUERY_JOB(_job) && (!SCH_IS_INSERT_JOB(_job)) && \
|
||||||
(!SCH_IS_DATA_BIND_QRY_TASK(_task)))
|
(!SCH_IS_DATA_BIND_QRY_TASK(_task)))
|
||||||
|
|
||||||
#define SCH_UPDATE_REDIRECT_CODE(job, _code) atomic_val_compare_exchange_32(&((job)->redirectCode), 0, _code)
|
#define SCH_UPDATE_REDIRECT_CODE(job, _code) (void)atomic_val_compare_exchange_32(&((job)->redirectCode), 0, _code)
|
||||||
#define SCH_GET_REDIRECT_CODE(job, _code) \
|
#define SCH_GET_REDIRECT_CODE(job, _code) \
|
||||||
(((!NO_RET_REDIRECT_ERROR(_code)) || (job)->redirectCode == 0) ? (_code) : (job)->redirectCode)
|
(((!NO_RET_REDIRECT_ERROR(_code)) || (job)->redirectCode == 0) ? (_code) : (job)->redirectCode)
|
||||||
|
|
||||||
|
@ -413,7 +413,7 @@ extern SSchedulerMgmt schMgmt;
|
||||||
#define SCH_LOG_TASK_START_TS(_task) \
|
#define SCH_LOG_TASK_START_TS(_task) \
|
||||||
do { \
|
do { \
|
||||||
int64_t us = taosGetTimestampUs(); \
|
int64_t us = taosGetTimestampUs(); \
|
||||||
taosArrayPush((_task)->profile.execTime, &us); \
|
(void)taosArrayPush((_task)->profile.execTime, &us); \
|
||||||
if (0 == (_task)->execId) { \
|
if (0 == (_task)->execId) { \
|
||||||
(_task)->profile.startTs = us; \
|
(_task)->profile.startTs = us; \
|
||||||
} \
|
} \
|
||||||
|
@ -422,7 +422,10 @@ extern SSchedulerMgmt schMgmt;
|
||||||
#define SCH_LOG_TASK_WAIT_TS(_task) \
|
#define SCH_LOG_TASK_WAIT_TS(_task) \
|
||||||
do { \
|
do { \
|
||||||
int64_t us = taosGetTimestampUs(); \
|
int64_t us = taosGetTimestampUs(); \
|
||||||
(_task)->profile.waitTime += us - *(int64_t *)taosArrayGet((_task)->profile.execTime, (_task)->execId); \
|
int64_t* startus = (int64_t*)taosArrayGet((_task)->profile.execTime, (_task)->execId); \
|
||||||
|
if (NULL != startus) { \
|
||||||
|
(_task)->profile.waitTime += us - *startus; \
|
||||||
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
#define SCH_LOG_TASK_END_TS(_task) \
|
#define SCH_LOG_TASK_END_TS(_task) \
|
||||||
|
@ -430,7 +433,9 @@ extern SSchedulerMgmt schMgmt;
|
||||||
int64_t us = taosGetTimestampUs(); \
|
int64_t us = taosGetTimestampUs(); \
|
||||||
int32_t idx = (_task)->execId % (_task)->maxExecTimes; \
|
int32_t idx = (_task)->execId % (_task)->maxExecTimes; \
|
||||||
int64_t *startts = taosArrayGet((_task)->profile.execTime, (_task)->execId); \
|
int64_t *startts = taosArrayGet((_task)->profile.execTime, (_task)->execId); \
|
||||||
*startts = us - *startts; \
|
if (NULL != startts) { \
|
||||||
|
*startts = us - *startts; \
|
||||||
|
} \
|
||||||
(_task)->profile.endTs = us; \
|
(_task)->profile.endTs = us; \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
||||||
|
@ -538,7 +543,7 @@ void schCleanClusterHb(void *pTrans);
|
||||||
int32_t schLaunchTask(SSchJob *job, SSchTask *task);
|
int32_t schLaunchTask(SSchJob *job, SSchTask *task);
|
||||||
int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask);
|
int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask);
|
||||||
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType, void *param);
|
int32_t schBuildAndSendMsg(SSchJob *job, SSchTask *task, SQueryNodeAddr *addr, int32_t msgType, void *param);
|
||||||
SSchJob *schAcquireJob(int64_t refId);
|
int32_t schAcquireJob(int64_t refId, SSchJob **ppJob);
|
||||||
int32_t schReleaseJob(int64_t refId);
|
int32_t schReleaseJob(int64_t refId);
|
||||||
void schFreeFlowCtrl(SSchJob *pJob);
|
void schFreeFlowCtrl(SSchJob *pJob);
|
||||||
int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel);
|
int32_t schChkJobNeedFlowCtrl(SSchJob *pJob, SSchLevel *pLevel);
|
||||||
|
@ -578,7 +583,7 @@ int32_t schJobFetchRows(SSchJob *pJob);
|
||||||
int32_t schJobFetchRowsA(SSchJob *pJob);
|
int32_t schJobFetchRowsA(SSchJob *pJob);
|
||||||
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId);
|
int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, void *handle, int32_t execId);
|
||||||
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList);
|
int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList);
|
||||||
char *schDumpEpSet(SEpSet *pEpSet);
|
int32_t schDumpEpSet(SEpSet *pEpSet, char** ppRes);
|
||||||
char *schGetOpStr(SCH_OP_TYPE type);
|
char *schGetOpStr(SCH_OP_TYPE type);
|
||||||
int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync);
|
int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync);
|
||||||
int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq);
|
int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq);
|
||||||
|
|
|
@ -1165,7 +1165,9 @@ int32_t schProcessOnCbBegin(SSchJob **job, SSchTask **task, uint64_t qId, int64_
|
||||||
int8_t status = 0;
|
int8_t status = 0;
|
||||||
|
|
||||||
SSchTask *pTask = NULL;
|
SSchTask *pTask = NULL;
|
||||||
SSchJob *pJob = schAcquireJob(rId);
|
SSchJob *pJob = NULL;
|
||||||
|
|
||||||
|
(void)schAcquireJob(rId, &pJob);
|
||||||
if (NULL == pJob) {
|
if (NULL == pJob) {
|
||||||
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, qId, tId, rId);
|
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, qId, tId, rId);
|
||||||
SCH_ERR_RET(TSDB_CODE_QRY_JOB_NOT_EXIST);
|
SCH_ERR_RET(TSDB_CODE_QRY_JOB_NOT_EXIST);
|
||||||
|
|
|
@ -113,7 +113,7 @@ int32_t schProcessFetchRsp(SSchJob *pJob, SSchTask *pTask, char *msg, int32_t rs
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_store_ptr(&pJob->fetchRes, rsp);
|
atomic_store_ptr(&pJob->fetchRes, rsp);
|
||||||
atomic_add_fetch_64(&pJob->resNumOfRows, htobe64(rsp->numOfRows));
|
(void)atomic_add_fetch_64(&pJob->resNumOfRows, htobe64(rsp->numOfRows));
|
||||||
|
|
||||||
if (rsp->completed) {
|
if (rsp->completed) {
|
||||||
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
|
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_SUCC);
|
||||||
|
@ -166,13 +166,27 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD
|
||||||
SCH_LOCK(SCH_WRITE, &pJob->resLock);
|
SCH_LOCK(SCH_WRITE, &pJob->resLock);
|
||||||
if (NULL == pJob->execRes.res) {
|
if (NULL == pJob->execRes.res) {
|
||||||
pJob->execRes.res = (void*)taosArrayInit(batchRsp.nRsps, POINTER_BYTES);
|
pJob->execRes.res = (void*)taosArrayInit(batchRsp.nRsps, POINTER_BYTES);
|
||||||
|
if (NULL == pJob->execRes.res) {
|
||||||
|
code = terrno;
|
||||||
|
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
|
||||||
|
|
||||||
|
tDecoderClear(&coder);
|
||||||
|
SCH_ERR_JRET(code);
|
||||||
|
}
|
||||||
|
|
||||||
pJob->execRes.msgType = TDMT_VND_CREATE_TABLE;
|
pJob->execRes.msgType = TDMT_VND_CREATE_TABLE;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
|
for (int32_t i = 0; i < batchRsp.nRsps; ++i) {
|
||||||
SVCreateTbRsp *rsp = batchRsp.pRsps + i;
|
SVCreateTbRsp *rsp = batchRsp.pRsps + i;
|
||||||
if (rsp->pMeta) {
|
if (rsp->pMeta) {
|
||||||
taosArrayPush((SArray*)pJob->execRes.res, &rsp->pMeta);
|
if (NULL == taosArrayPush((SArray*)pJob->execRes.res, &rsp->pMeta)) {
|
||||||
|
code = terrno;
|
||||||
|
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
|
||||||
|
|
||||||
|
tDecoderClear(&coder);
|
||||||
|
SCH_ERR_JRET(code);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (TSDB_CODE_SUCCESS != rsp->code) {
|
if (TSDB_CODE_SUCCESS != rsp->code) {
|
||||||
|
@ -264,7 +278,7 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD
|
||||||
SCH_ERR_JRET(code);
|
SCH_ERR_JRET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_add_fetch_64(&pJob->resNumOfRows, rsp->affectedRows);
|
(void)atomic_add_fetch_64(&pJob->resNumOfRows, rsp->affectedRows);
|
||||||
|
|
||||||
int32_t createTbRspNum = taosArrayGetSize(rsp->aCreateTbRsp);
|
int32_t createTbRspNum = taosArrayGetSize(rsp->aCreateTbRsp);
|
||||||
SCH_TASK_DLOG("submit succeed, affectedRows:%d, createTbRspNum:%d", rsp->affectedRows, createTbRspNum);
|
SCH_TASK_DLOG("submit succeed, affectedRows:%d, createTbRspNum:%d", rsp->affectedRows, createTbRspNum);
|
||||||
|
@ -275,7 +289,12 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD
|
||||||
SSubmitRsp2 *sum = pJob->execRes.res;
|
SSubmitRsp2 *sum = pJob->execRes.res;
|
||||||
sum->affectedRows += rsp->affectedRows;
|
sum->affectedRows += rsp->affectedRows;
|
||||||
if (sum->aCreateTbRsp) {
|
if (sum->aCreateTbRsp) {
|
||||||
taosArrayAddAll(sum->aCreateTbRsp, rsp->aCreateTbRsp);
|
if (NULL == taosArrayAddAll(sum->aCreateTbRsp, rsp->aCreateTbRsp)) {
|
||||||
|
code = terrno;
|
||||||
|
SCH_UNLOCK(SCH_WRITE, &pJob->resLock);
|
||||||
|
SCH_ERR_JRET(code);
|
||||||
|
}
|
||||||
|
|
||||||
taosArrayDestroy(rsp->aCreateTbRsp);
|
taosArrayDestroy(rsp->aCreateTbRsp);
|
||||||
} else {
|
} else {
|
||||||
TSWAP(sum->aCreateTbRsp, rsp->aCreateTbRsp);
|
TSWAP(sum->aCreateTbRsp, rsp->aCreateTbRsp);
|
||||||
|
@ -313,10 +332,14 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD
|
||||||
SDecoder coder = {0};
|
SDecoder coder = {0};
|
||||||
SVDeleteRsp rsp = {0};
|
SVDeleteRsp rsp = {0};
|
||||||
tDecoderInit(&coder, msg, msgSize);
|
tDecoderInit(&coder, msg, msgSize);
|
||||||
tDecodeSVDeleteRsp(&coder, &rsp);
|
if (tDecodeSVDeleteRsp(&coder, &rsp) < 0) {
|
||||||
|
code = terrno;
|
||||||
|
tDecoderClear(&coder);
|
||||||
|
SCH_ERR_JRET(code);
|
||||||
|
}
|
||||||
tDecoderClear(&coder);
|
tDecoderClear(&coder);
|
||||||
|
|
||||||
atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows);
|
(void)atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows);
|
||||||
SCH_TASK_DLOG("delete succeed, affectedRows:%" PRId64, rsp.affectedRows);
|
SCH_TASK_DLOG("delete succeed, affectedRows:%" PRId64, rsp.affectedRows);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -351,7 +374,7 @@ int32_t schProcessResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SD
|
||||||
|
|
||||||
SCH_ERR_JRET(schSaveJobExecRes(pJob, &rsp));
|
SCH_ERR_JRET(schSaveJobExecRes(pJob, &rsp));
|
||||||
|
|
||||||
atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows);
|
(void)atomic_add_fetch_64(&pJob->resNumOfRows, rsp.affectedRows);
|
||||||
|
|
||||||
taosMemoryFreeClear(msg);
|
taosMemoryFreeClear(msg);
|
||||||
|
|
||||||
|
@ -479,7 +502,7 @@ int32_t schHandleDropCallback(void *param, SDataBuf *pMsg, int32_t code) {
|
||||||
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", pParam->queryId, pParam->taskId,
|
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 " drop task rsp received, code:0x%x", pParam->queryId, pParam->taskId,
|
||||||
code);
|
code);
|
||||||
// called if drop task rsp received code
|
// called if drop task rsp received code
|
||||||
rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT);
|
(void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); // ignore error
|
||||||
if (pMsg) {
|
if (pMsg) {
|
||||||
taosMemoryFree(pMsg->pData);
|
taosMemoryFree(pMsg->pData);
|
||||||
taosMemoryFree(pMsg->pEpSet);
|
taosMemoryFree(pMsg->pEpSet);
|
||||||
|
@ -501,7 +524,7 @@ int32_t schHandleNotifyCallback(void *param, SDataBuf *pMsg, int32_t code) {
|
||||||
|
|
||||||
int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) {
|
int32_t schHandleLinkBrokenCallback(void *param, SDataBuf *pMsg, int32_t code) {
|
||||||
SSchCallbackParamHeader *head = (SSchCallbackParamHeader *)param;
|
SSchCallbackParamHeader *head = (SSchCallbackParamHeader *)param;
|
||||||
rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT);
|
(void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); // ignore error
|
||||||
|
|
||||||
qDebug("handle %p is broken", pMsg->handle);
|
qDebug("handle %p is broken", pMsg->handle);
|
||||||
|
|
||||||
|
@ -531,7 +554,7 @@ int32_t schHandleHbCallback(void *param, SDataBuf *pMsg, int32_t code) {
|
||||||
|
|
||||||
if (code) {
|
if (code) {
|
||||||
qError("hb rsp error:%s", tstrerror(code));
|
qError("hb rsp error:%s", tstrerror(code));
|
||||||
rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT);
|
(void)rpcReleaseHandle(pMsg->handle, TAOS_CONN_CLIENT); // ignore error
|
||||||
SCH_ERR_JRET(code);
|
SCH_ERR_JRET(code);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -585,9 +608,14 @@ int32_t schMakeCallbackParam(SSchJob *pJob, SSchTask *pTask, int32_t msgType, bo
|
||||||
param->head.isHbParam = true;
|
param->head.isHbParam = true;
|
||||||
|
|
||||||
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
||||||
|
if (NULL == addr) {
|
||||||
|
taosMemoryFree(param);
|
||||||
|
SCH_TASK_ELOG("fail to get the %dth condidateAddr, totalNum: %d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs));
|
||||||
|
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||||
|
}
|
||||||
param->nodeEpId.nodeId = addr->nodeId;
|
param->nodeEpId.nodeId = addr->nodeId;
|
||||||
SEp *pEp = SCH_GET_CUR_EP(addr);
|
SEp *pEp = SCH_GET_CUR_EP(addr);
|
||||||
strcpy(param->nodeEpId.ep.fqdn, pEp->fqdn);
|
TAOS_STRCPY(param->nodeEpId.ep.fqdn, pEp->fqdn);
|
||||||
param->nodeEpId.ep.port = pEp->port;
|
param->nodeEpId.ep.port = pEp->port;
|
||||||
param->pTrans = trans->pTrans;
|
param->pTrans = trans->pTrans;
|
||||||
*pParam = param;
|
*pParam = param;
|
||||||
|
@ -712,7 +740,7 @@ int32_t schMakeHbCallbackParam(SSchJob *pJob, SSchTask *pTask, void **pParam) {
|
||||||
|
|
||||||
int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) {
|
int32_t schCloneHbRpcCtx(SRpcCtx *pSrc, SRpcCtx *pDst) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
memcpy(pDst, pSrc, sizeof(SRpcCtx));
|
TAOS_MEMCPY(pDst, pSrc, sizeof(SRpcCtx));
|
||||||
pDst->brokenVal.val = NULL;
|
pDst->brokenVal.val = NULL;
|
||||||
pDst->args = NULL;
|
pDst->args = NULL;
|
||||||
|
|
||||||
|
@ -760,7 +788,7 @@ int32_t schMakeHbRpcCtx(SSchJob *pJob, SSchTask *pTask, SRpcCtx *pCtx) {
|
||||||
SQueryNodeEpId epId = {0};
|
SQueryNodeEpId epId = {0};
|
||||||
|
|
||||||
epId.nodeId = addr->nodeId;
|
epId.nodeId = addr->nodeId;
|
||||||
memcpy(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
|
TAOS_MEMCPY(&epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
|
||||||
|
|
||||||
pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
|
pCtx->args = taosHashInit(1, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
|
||||||
if (NULL == pCtx->args) {
|
if (NULL == pCtx->args) {
|
||||||
|
@ -877,7 +905,7 @@ int32_t schCloneCallbackParam(SSchCallbackParamHeader *pSrc, SSchCallbackParamHe
|
||||||
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(dst, pSrc, sizeof(*dst));
|
TAOS_MEMCPY(dst, pSrc, sizeof(*dst));
|
||||||
*pDst = (SSchCallbackParamHeader *)dst;
|
*pDst = (SSchCallbackParamHeader *)dst;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -889,7 +917,7 @@ int32_t schCloneCallbackParam(SSchCallbackParamHeader *pSrc, SSchCallbackParamHe
|
||||||
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(dst, pSrc, sizeof(*dst));
|
TAOS_MEMCPY(dst, pSrc, sizeof(*dst));
|
||||||
*pDst = (SSchCallbackParamHeader *)dst;
|
*pDst = (SSchCallbackParamHeader *)dst;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
@ -904,7 +932,7 @@ int32_t schCloneSMsgSendInfo(void *src, void **dst) {
|
||||||
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(pDst, pSrc, sizeof(*pSrc));
|
TAOS_MEMCPY(pDst, pSrc, sizeof(*pSrc));
|
||||||
pDst->param = NULL;
|
pDst->param = NULL;
|
||||||
|
|
||||||
SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param));
|
SCH_ERR_JRET(schCloneCallbackParam(pSrc->param, (SSchCallbackParamHeader **)&pDst->param));
|
||||||
|
@ -948,6 +976,10 @@ int32_t schAsyncSendMsg(SSchJob *pJob, SSchTask *pTask, SSchTrans *trans, SQuery
|
||||||
|
|
||||||
if (isHb && persistHandle && trans->pHandle == 0) {
|
if (isHb && persistHandle && trans->pHandle == 0) {
|
||||||
trans->pHandle = rpcAllocHandle();
|
trans->pHandle = rpcAllocHandle();
|
||||||
|
if (NULL == trans->pHandle) {
|
||||||
|
SCH_TASK_ELOG("rpcAllocHandle failed, code:%x", terrno);
|
||||||
|
SCH_ERR_JRET(terrno);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pJob && pTask) {
|
if (pJob && pTask) {
|
||||||
|
@ -1000,7 +1032,7 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray *taskAction) {
|
||||||
|
|
||||||
req.header.vgId = nodeEpId->nodeId;
|
req.header.vgId = nodeEpId->nodeId;
|
||||||
req.sId = schMgmt.sId;
|
req.sId = schMgmt.sId;
|
||||||
memcpy(&req.epId, nodeEpId, sizeof(SQueryNodeEpId));
|
TAOS_MEMCPY(&req.epId, nodeEpId, sizeof(SQueryNodeEpId));
|
||||||
|
|
||||||
SCH_LOCK(SCH_READ, &schMgmt.hbLock);
|
SCH_LOCK(SCH_READ, &schMgmt.hbLock);
|
||||||
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, nodeEpId, sizeof(SQueryNodeEpId));
|
SSchHbTrans *hb = taosHashGet(schMgmt.hbConnections, nodeEpId, sizeof(SQueryNodeEpId));
|
||||||
|
@ -1013,7 +1045,7 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray *taskAction) {
|
||||||
|
|
||||||
SCH_LOCK(SCH_WRITE, &hb->lock);
|
SCH_LOCK(SCH_WRITE, &hb->lock);
|
||||||
code = schCloneHbRpcCtx(&hb->rpcCtx, &rpcCtx);
|
code = schCloneHbRpcCtx(&hb->rpcCtx, &rpcCtx);
|
||||||
memcpy(&trans, &hb->trans, sizeof(trans));
|
TAOS_MEMCPY(&trans, &hb->trans, sizeof(trans));
|
||||||
SCH_UNLOCK(SCH_WRITE, &hb->lock);
|
SCH_UNLOCK(SCH_WRITE, &hb->lock);
|
||||||
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
||||||
|
|
||||||
|
@ -1039,7 +1071,7 @@ int32_t schBuildAndSendHbMsg(SQueryNodeEpId *nodeEpId, SArray *taskAction) {
|
||||||
SQueryNodeAddr addr = {.nodeId = nodeEpId->nodeId};
|
SQueryNodeAddr addr = {.nodeId = nodeEpId->nodeId};
|
||||||
addr.epSet.inUse = 0;
|
addr.epSet.inUse = 0;
|
||||||
addr.epSet.numOfEps = 1;
|
addr.epSet.numOfEps = 1;
|
||||||
memcpy(&addr.epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep));
|
TAOS_MEMCPY(&addr.epSet.eps[0], &nodeEpId->ep, sizeof(nodeEpId->ep));
|
||||||
|
|
||||||
code = schAsyncSendMsg(NULL, NULL, &trans, &addr, msgType, msg, msgSize, true, &rpcCtx);
|
code = schAsyncSendMsg(NULL, NULL, &trans, &addr, msgType, msg, msgSize, true, &rpcCtx);
|
||||||
msg = NULL;
|
msg = NULL;
|
||||||
|
@ -1064,6 +1096,11 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
||||||
|
|
||||||
if (NULL == addr) {
|
if (NULL == addr) {
|
||||||
addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
||||||
|
if (NULL == addr) {
|
||||||
|
SCH_TASK_ELOG("fail to get condidateAddr, candidateIdx %d, totalNum: %d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs));
|
||||||
|
SCH_ERR_JRET(terrno);
|
||||||
|
}
|
||||||
|
|
||||||
isCandidateAddr = true;
|
isCandidateAddr = true;
|
||||||
SCH_TASK_DLOG("target candidateIdx %d, epInUse %d/%d", pTask->candidateIdx, addr->epSet.inUse,
|
SCH_TASK_DLOG("target candidateIdx %d, epInUse %d/%d", pTask->candidateIdx, addr->epSet.inUse,
|
||||||
addr->epSet.numOfEps);
|
addr->epSet.numOfEps);
|
||||||
|
@ -1082,7 +1119,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
||||||
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
memcpy(msg, pTask->msg, msgSize);
|
TAOS_MEMCPY(msg, pTask->msg, msgSize);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1098,13 +1135,21 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
||||||
req.msg = pTask->msg;
|
req.msg = pTask->msg;
|
||||||
req.source = pJob->source;
|
req.source = pJob->source;
|
||||||
msgSize = tSerializeSVDeleteReq(NULL, 0, &req);
|
msgSize = tSerializeSVDeleteReq(NULL, 0, &req);
|
||||||
|
if (msgSize < 0) {
|
||||||
|
SCH_TASK_ELOG("tSerializeSVDeleteReq failed, code:%x", terrno);
|
||||||
|
SCH_ERR_JRET(terrno);
|
||||||
|
}
|
||||||
msg = taosMemoryCalloc(1, msgSize);
|
msg = taosMemoryCalloc(1, msgSize);
|
||||||
if (NULL == msg) {
|
if (NULL == msg) {
|
||||||
SCH_TASK_ELOG("calloc %d failed", msgSize);
|
SCH_TASK_ELOG("calloc %d failed", msgSize);
|
||||||
SCH_ERR_RET(TSDB_CODE_OUT_OF_MEMORY);
|
SCH_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
|
||||||
tSerializeSVDeleteReq(msg, msgSize, &req);
|
msgSize = tSerializeSVDeleteReq(msg, msgSize, &req);
|
||||||
|
if (msgSize < 0) {
|
||||||
|
SCH_TASK_ELOG("tSerializeSVDeleteReq second failed, code:%x", terrno);
|
||||||
|
SCH_ERR_JRET(terrno);
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TDMT_SCH_QUERY:
|
case TDMT_SCH_QUERY:
|
||||||
|
@ -1221,7 +1266,7 @@ int32_t schBuildAndSendMsg(SSchJob *pJob, SSchTask *pTask, SQueryNodeAddr *addr,
|
||||||
req.sId = schMgmt.sId;
|
req.sId = schMgmt.sId;
|
||||||
req.header.vgId = addr->nodeId;
|
req.header.vgId = addr->nodeId;
|
||||||
req.epId.nodeId = addr->nodeId;
|
req.epId.nodeId = addr->nodeId;
|
||||||
memcpy(&req.epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
|
TAOS_MEMCPY(&req.epId.ep, SCH_GET_CUR_EP(addr), sizeof(SEp));
|
||||||
|
|
||||||
msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
|
msgSize = tSerializeSSchedulerHbReq(NULL, 0, &req);
|
||||||
if (msgSize < 0) {
|
if (msgSize < 0) {
|
||||||
|
|
|
@ -43,7 +43,7 @@ int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param) {
|
||||||
SCH_RET(schProcessOnJobFailure(pJob, (param ? *(int32_t*)param : 0)));
|
SCH_RET(schProcessOnJobFailure(pJob, (param ? *(int32_t*)param : 0)));
|
||||||
break;
|
break;
|
||||||
case JOB_TASK_STATUS_DROP:
|
case JOB_TASK_STATUS_DROP:
|
||||||
schProcessOnJobDropped(pJob, *(int32_t*)param);
|
(void)schProcessOnJobDropped(pJob, *(int32_t*)param); // ignore error
|
||||||
|
|
||||||
if (taosRemoveRef(schMgmt.jobRef, pJob->refId)) {
|
if (taosRemoveRef(schMgmt.jobRef, pJob->refId)) {
|
||||||
SCH_JOB_ELOG("remove job from job list failed, refId:0x%" PRIx64, pJob->refId);
|
SCH_JOB_ELOG("remove job from job list failed, refId:0x%" PRIx64, pJob->refId);
|
||||||
|
@ -65,7 +65,8 @@ _return:
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t schHandleOpBeginEvent(int64_t jobId, SSchJob** job, SCH_OP_TYPE type, SSchedulerReq* pReq) {
|
int32_t schHandleOpBeginEvent(int64_t jobId, SSchJob** job, SCH_OP_TYPE type, SSchedulerReq* pReq) {
|
||||||
SSchJob* pJob = schAcquireJob(jobId);
|
SSchJob* pJob = NULL;
|
||||||
|
(void)schAcquireJob(jobId, &pJob);
|
||||||
if (NULL == pJob) {
|
if (NULL == pJob) {
|
||||||
qDebug("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, jobId);
|
qDebug("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, jobId);
|
||||||
SCH_ERR_RET(TSDB_CODE_SCH_JOB_NOT_EXISTS);
|
SCH_ERR_RET(TSDB_CODE_SCH_JOB_NOT_EXISTS);
|
||||||
|
@ -90,7 +91,7 @@ int32_t schHandleOpEndEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq
|
||||||
code = pJob->errCode;
|
code = pJob->errCode;
|
||||||
}
|
}
|
||||||
|
|
||||||
schReleaseJob(pJob->refId);
|
(void)schReleaseJob(pJob->refId); // ignore error
|
||||||
|
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
|
@ -165,7 +165,7 @@ int32_t schUpdateTaskHandle(SSchJob *pJob, SSchTask *pTask, bool dropExecNode, v
|
||||||
SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execId));
|
SCH_RET(schDropTaskExecNode(pJob, pTask, handle, execId));
|
||||||
}
|
}
|
||||||
|
|
||||||
schUpdateTaskExecNode(pJob, pTask, handle, execId);
|
SCH_ERR_RET(schUpdateTaskExecNode(pJob, pTask, handle, execId));
|
||||||
|
|
||||||
if ((execId != pTask->execId || execId <= pTask->failedExecId) || pTask->waitRetry) { // ignore it
|
if ((execId != pTask->execId || execId <= pTask->failedExecId) || pTask->waitRetry) { // ignore it
|
||||||
SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId,
|
SCH_TASK_DLOG("handle not updated since execId %d is already not current execId %d, waitRetry %d", execId,
|
||||||
|
@ -297,7 +297,11 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < parentNum; ++i) {
|
for (int32_t i = 0; i < parentNum; ++i) {
|
||||||
SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i);
|
SSchTask *parent = *(SSchTask **)taosArrayGet(pTask->parents, i);
|
||||||
|
if (NULL == parent) {
|
||||||
|
SCH_TASK_ELOG("fail to get task %d parent, parentNum: %d", i, parentNum);
|
||||||
|
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
SCH_LOCK(SCH_WRITE, &parent->planLock);
|
SCH_LOCK(SCH_WRITE, &parent->planLock);
|
||||||
SDownstreamSourceNode source = {
|
SDownstreamSourceNode source = {
|
||||||
.type = QUERY_NODE_DOWNSTREAM_SOURCE,
|
.type = QUERY_NODE_DOWNSTREAM_SOURCE,
|
||||||
|
@ -308,9 +312,14 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) {
|
||||||
.fetchMsgType = SCH_FETCH_TYPE(pTask),
|
.fetchMsgType = SCH_FETCH_TYPE(pTask),
|
||||||
.localExec = SCH_IS_LOCAL_EXEC_TASK(pJob, pTask),
|
.localExec = SCH_IS_LOCAL_EXEC_TASK(pJob, pTask),
|
||||||
};
|
};
|
||||||
qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
|
code = qSetSubplanExecutionNode(parent->plan, pTask->plan->id.groupId, &source);
|
||||||
|
if (TSDB_CODE_SUCCESS != code) {
|
||||||
|
SCH_TASK_ELOG("qSetSubplanExecutionNode failed, groupId: %d", pTask->plan->id.groupId);
|
||||||
|
}
|
||||||
SCH_UNLOCK(SCH_WRITE, &parent->planLock);
|
SCH_UNLOCK(SCH_WRITE, &parent->planLock);
|
||||||
|
|
||||||
|
SCH_ERR_RET(code);
|
||||||
|
|
||||||
int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1);
|
int32_t readyNum = atomic_add_fetch_32(&parent->childReady, 1);
|
||||||
|
|
||||||
if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) {
|
if (SCH_TASK_READY_FOR_LAUNCH(readyNum, parent)) {
|
||||||
|
@ -413,16 +422,16 @@ void schResetTaskForRetry(SSchJob *pJob, SSchTask *pTask) {
|
||||||
|
|
||||||
schDropTaskOnExecNode(pJob, pTask);
|
schDropTaskOnExecNode(pJob, pTask);
|
||||||
if (pTask->delayTimer) {
|
if (pTask->delayTimer) {
|
||||||
taosTmrStopA(&pTask->delayTimer);
|
(void)taosTmrStopA(&pTask->delayTimer); // ignore error
|
||||||
}
|
}
|
||||||
taosHashClear(pTask->execNodes);
|
taosHashClear(pTask->execNodes);
|
||||||
schRemoveTaskFromExecList(pJob, pTask);
|
(void)schRemoveTaskFromExecList(pJob, pTask); // ignore error
|
||||||
schDeregisterTaskHb(pJob, pTask);
|
schDeregisterTaskHb(pJob, pTask);
|
||||||
taosMemoryFreeClear(pTask->msg);
|
taosMemoryFreeClear(pTask->msg);
|
||||||
pTask->msgLen = 0;
|
pTask->msgLen = 0;
|
||||||
pTask->lastMsgType = 0;
|
pTask->lastMsgType = 0;
|
||||||
pTask->childReady = 0;
|
pTask->childReady = 0;
|
||||||
memset(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
|
TAOS_MEMSET(&pTask->succeedAddr, 0, sizeof(pTask->succeedAddr));
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
|
int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32_t rspCode) {
|
||||||
|
@ -443,11 +452,21 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
|
||||||
SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
|
SCH_ERR_JRET(schUpdateTaskCandidateAddr(pJob, pTask, pData->pEpSet));
|
||||||
} else if (SYNC_SELF_LEADER_REDIRECT_ERROR(rspCode)) {
|
} else if (SYNC_SELF_LEADER_REDIRECT_ERROR(rspCode)) {
|
||||||
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
||||||
|
if (NULL == addr) {
|
||||||
|
SCH_TASK_ELOG("fail to get the %dth condidateAddr, totalNum:%d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs));
|
||||||
|
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
SEp *pEp = &addr->epSet.eps[addr->epSet.inUse];
|
SEp *pEp = &addr->epSet.eps[addr->epSet.inUse];
|
||||||
SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d, code:%s", addr->nodeId, addr->epSet.inUse,
|
SCH_TASK_DLOG("task retry node %d current ep, idx:%d/%d,%s:%d, code:%s", addr->nodeId, addr->epSet.inUse,
|
||||||
addr->epSet.numOfEps, pEp->fqdn, pEp->port, tstrerror(rspCode));
|
addr->epSet.numOfEps, pEp->fqdn, pEp->port, tstrerror(rspCode));
|
||||||
} else {
|
} else {
|
||||||
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
||||||
|
if (NULL == addr) {
|
||||||
|
SCH_TASK_ELOG("fail to get the %dth condidateAddr, totalNum:%d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs));
|
||||||
|
SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
SCH_SWITCH_EPSET(addr);
|
SCH_SWITCH_EPSET(addr);
|
||||||
SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps);
|
SCH_TASK_DLOG("switch task target node %d epset to %d/%d", addr->nodeId, addr->epSet.inUse, addr->epSet.numOfEps);
|
||||||
}
|
}
|
||||||
|
@ -476,7 +495,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf *pData, int32
|
||||||
for (int32_t i = 0; i < childrenNum; ++i) {
|
for (int32_t i = 0; i < childrenNum; ++i) {
|
||||||
SSchTask *pChild = taosArrayGetP(pTask->children, i);
|
SSchTask *pChild = taosArrayGetP(pTask->children, i);
|
||||||
SCH_LOCK_TASK(pChild);
|
SCH_LOCK_TASK(pChild);
|
||||||
schDoTaskRedirect(pJob, pChild, NULL, rspCode);
|
(void)schDoTaskRedirect(pJob, pChild, NULL, rspCode); // error handled internal
|
||||||
SCH_UNLOCK_TASK(pChild);
|
SCH_UNLOCK_TASK(pChild);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -494,18 +513,23 @@ int32_t schResetTaskSetLevelInfo(SSchJob *pJob, SSchTask *pTask) {
|
||||||
atomic_load_32(&pLevel->taskExecDoneNum), atomic_load_32(&pLevel->taskLaunchedNum));
|
atomic_load_32(&pLevel->taskExecDoneNum), atomic_load_32(&pLevel->taskLaunchedNum));
|
||||||
|
|
||||||
if (SCH_GET_TASK_STATUS(pTask) >= JOB_TASK_STATUS_PART_SUCC) {
|
if (SCH_GET_TASK_STATUS(pTask) >= JOB_TASK_STATUS_PART_SUCC) {
|
||||||
atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1);
|
(void)atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1);
|
(void)atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1);
|
||||||
|
|
||||||
int32_t childrenNum = taosArrayGetSize(pTask->children);
|
int32_t childrenNum = taosArrayGetSize(pTask->children);
|
||||||
for (int32_t i = 0; i < childrenNum; ++i) {
|
for (int32_t i = 0; i < childrenNum; ++i) {
|
||||||
SSchTask *pChild = taosArrayGetP(pTask->children, i);
|
SSchTask *pChild = taosArrayGetP(pTask->children, i);
|
||||||
|
if (NULL == pChild) {
|
||||||
|
SCH_TASK_ELOG("fail to get the %dth child, childrenNum:%d", i, childrenNum);
|
||||||
|
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
SCH_LOCK_TASK(pChild);
|
SCH_LOCK_TASK(pChild);
|
||||||
pLevel = pChild->level;
|
pLevel = pChild->level;
|
||||||
atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1);
|
(void)atomic_sub_fetch_32(&pLevel->taskExecDoneNum, 1);
|
||||||
atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1);
|
(void)atomic_sub_fetch_32(&pLevel->taskLaunchedNum, 1);
|
||||||
SCH_UNLOCK_TASK(pChild);
|
SCH_UNLOCK_TASK(pChild);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -711,9 +735,9 @@ int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bo
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
|
int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
|
||||||
atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
|
(void)atomic_sub_fetch_32(&pTask->level->taskLaunchedNum, 1);
|
||||||
|
|
||||||
schRemoveTaskFromExecList(pJob, pTask);
|
(void)schRemoveTaskFromExecList(pJob, pTask); // ignore error
|
||||||
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
|
SCH_SET_TASK_STATUS(pTask, JOB_TASK_STATUS_INIT);
|
||||||
|
|
||||||
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
|
if (SCH_TASK_NEED_FLOW_CTRL(pJob, pTask)) {
|
||||||
|
@ -724,6 +748,11 @@ int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask) {
|
||||||
|
|
||||||
if (SCH_IS_DATA_BIND_TASK(pTask)) {
|
if (SCH_IS_DATA_BIND_TASK(pTask)) {
|
||||||
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
||||||
|
if (NULL == addr) {
|
||||||
|
SCH_TASK_ELOG("fail to the %dth condidateAddr, totalNum:%d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs));
|
||||||
|
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
SCH_SWITCH_EPSET(addr);
|
SCH_SWITCH_EPSET(addr);
|
||||||
} else {
|
} else {
|
||||||
SCH_ERR_RET(schSwitchTaskCandidateAddr(pJob, pTask));
|
SCH_ERR_RET(schSwitchTaskCandidateAddr(pJob, pTask));
|
||||||
|
@ -743,6 +772,11 @@ int32_t schSetAddrsFromNodeList(SSchJob *pJob, SSchTask *pTask) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < nodeNum; ++i) {
|
for (int32_t i = 0; i < nodeNum; ++i) {
|
||||||
SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i);
|
SQueryNodeLoad *nload = taosArrayGet(pJob->nodeList, i);
|
||||||
|
if (NULL == nload) {
|
||||||
|
SCH_TASK_ELOG("fail to get the %dth node in nodeList, nodeNum:%d", i, nodeNum);
|
||||||
|
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
SQueryNodeAddr *naddr = &nload->addr;
|
SQueryNodeAddr *naddr = &nload->addr;
|
||||||
|
|
||||||
if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
|
if (NULL == taosArrayPush(pTask->candidateAddrs, naddr)) {
|
||||||
|
@ -810,6 +844,7 @@ int32_t schSetTaskCandidateAddrs(SSchJob *pJob, SSchTask *pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) {
|
int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSet) {
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) {
|
if (NULL == pTask->candidateAddrs || 1 != taosArrayGetSize(pTask->candidateAddrs)) {
|
||||||
SCH_TASK_ELOG("not able to update cndidate addr, addr num %d",
|
SCH_TASK_ELOG("not able to update cndidate addr, addr num %d",
|
||||||
(int32_t)(pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs) : 0));
|
(int32_t)(pTask->candidateAddrs ? taosArrayGetSize(pTask->candidateAddrs) : 0));
|
||||||
|
@ -817,18 +852,27 @@ int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet *pEpSe
|
||||||
}
|
}
|
||||||
|
|
||||||
SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
|
SQueryNodeAddr *pAddr = taosArrayGet(pTask->candidateAddrs, 0);
|
||||||
|
if (NULL == pAddr) {
|
||||||
|
SCH_TASK_ELOG("fail to get task 0th condidataAddr, totalNum:%d", (int32_t)taosArrayGetSize(pTask->candidateAddrs));
|
||||||
|
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
|
||||||
|
}
|
||||||
|
|
||||||
char *origEpset = schDumpEpSet(&pAddr->epSet);
|
char *origEpset = NULL;
|
||||||
char *newEpset = schDumpEpSet(pEpSet);
|
char *newEpset = NULL;
|
||||||
|
|
||||||
|
SCH_ERR_RET(schDumpEpSet(&pAddr->epSet, &origEpset));
|
||||||
|
SCH_ERR_JRET(schDumpEpSet(pEpSet, &newEpset));
|
||||||
|
|
||||||
SCH_TASK_DLOG("update task target node %d epset from %s to %s", pAddr->nodeId, origEpset, newEpset);
|
SCH_TASK_DLOG("update task target node %d epset from %s to %s", pAddr->nodeId, origEpset, newEpset);
|
||||||
|
|
||||||
|
TAOS_MEMCPY(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet));
|
||||||
|
|
||||||
|
_return:
|
||||||
|
|
||||||
taosMemoryFree(origEpset);
|
taosMemoryFree(origEpset);
|
||||||
taosMemoryFree(newEpset);
|
taosMemoryFree(newEpset);
|
||||||
|
|
||||||
memcpy(&pAddr->epSet, pEpSet, sizeof(pAddr->epSet));
|
return code;
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) {
|
int32_t schSwitchTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask) {
|
||||||
|
@ -877,7 +921,6 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);
|
int32_t size = (int32_t)taosHashGetSize(pTask->execNodes);
|
||||||
|
|
||||||
if (size <= 0) {
|
if (size <= 0) {
|
||||||
SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
|
SCH_TASK_DLOG("task has no execNodes, no need to drop it, status:%s", SCH_GET_TASK_STATUS_STR(pTask));
|
||||||
return;
|
return;
|
||||||
|
@ -889,7 +932,7 @@ void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask) {
|
||||||
if (nodeInfo->handle) {
|
if (nodeInfo->handle) {
|
||||||
SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
|
SCH_SET_TASK_HANDLE(pTask, nodeInfo->handle);
|
||||||
void *pExecId = taosHashGetKey(nodeInfo, NULL);
|
void *pExecId = taosHashGetKey(nodeInfo, NULL);
|
||||||
schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK, pExecId);
|
(void)schBuildAndSendMsg(pJob, pTask, &nodeInfo->addr, TDMT_SCH_DROP_TASK, pExecId); // ignore error and continue
|
||||||
|
|
||||||
SCH_TASK_DLOG("start to drop task's %dth execNode", i);
|
SCH_TASK_DLOG("start to drop task's %dth execNode", i);
|
||||||
} else {
|
} else {
|
||||||
|
@ -939,6 +982,11 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId *pEpId, SArray *pStatusList) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < taskNum; ++i) {
|
for (int32_t i = 0; i < taskNum; ++i) {
|
||||||
STaskStatus *pStatus = taosArrayGet(pStatusList, i);
|
STaskStatus *pStatus = taosArrayGet(pStatusList, i);
|
||||||
|
if (NULL == pStatus) {
|
||||||
|
qError("fail to get the %dth task status in hb rsp, taskNum:%d", i, taskNum);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", pStatus->queryId, pStatus->taskId,
|
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ",EID:%d task status in server: %s", pStatus->queryId, pStatus->taskId,
|
||||||
|
@ -983,10 +1031,15 @@ int32_t schHandleExplainRes(SArray *pExplainRes) {
|
||||||
|
|
||||||
for (int32_t i = 0; i < resNum; ++i) {
|
for (int32_t i = 0; i < resNum; ++i) {
|
||||||
SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
|
SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
|
||||||
|
if (NULL == localRsp) {
|
||||||
|
qError("fail to get the %dth LOCAL explain rsp msg, total:%d", i, resNum);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg", localRsp->qId, localRsp->tId);
|
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", begin to handle LOCAL explain rsp msg", localRsp->qId, localRsp->tId);
|
||||||
|
|
||||||
pJob = schAcquireJob(localRsp->rId);
|
pJob = NULL;
|
||||||
|
(void)schAcquireJob(localRsp->rId, &pJob);
|
||||||
if (NULL == pJob) {
|
if (NULL == pJob) {
|
||||||
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, localRsp->qId,
|
qWarn("QID:0x%" PRIx64 ",TID:0x%" PRIx64 "job no exist, may be dropped, refId:0x%" PRIx64, localRsp->qId,
|
||||||
localRsp->tId, localRsp->rId);
|
localRsp->tId, localRsp->rId);
|
||||||
|
@ -996,7 +1049,7 @@ int32_t schHandleExplainRes(SArray *pExplainRes) {
|
||||||
int8_t status = 0;
|
int8_t status = 0;
|
||||||
if (schJobNeedToStop(pJob, &status)) {
|
if (schJobNeedToStop(pJob, &status)) {
|
||||||
SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
|
SCH_TASK_DLOG("will not do further processing cause of job status %s", jobTaskStatusStr(status));
|
||||||
schReleaseJob(pJob->refId);
|
(void)schReleaseJob(pJob->refId);
|
||||||
SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
|
SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1006,7 +1059,7 @@ int32_t schHandleExplainRes(SArray *pExplainRes) {
|
||||||
code = schProcessExplainRsp(pJob, pTask, &localRsp->rsp);
|
code = schProcessExplainRsp(pJob, pTask, &localRsp->rsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
schReleaseJob(pJob->refId);
|
(void)schReleaseJob(pJob->refId);
|
||||||
|
|
||||||
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x", localRsp->qId,
|
qDebug("QID:0x%" PRIx64 ",TID:0x%" PRIx64 ", end to handle LOCAL explain rsp msg, code:%x", localRsp->qId,
|
||||||
localRsp->tId, code);
|
localRsp->tId, code);
|
||||||
|
@ -1022,6 +1075,11 @@ _return:
|
||||||
|
|
||||||
for (int32_t i = 0; i < resNum; ++i) {
|
for (int32_t i = 0; i < resNum; ++i) {
|
||||||
SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
|
SExplainLocalRsp *localRsp = taosArrayGet(pExplainRes, i);
|
||||||
|
if (NULL == localRsp) {
|
||||||
|
qError("in _return fail to get the %dth LOCAL explain rsp msg, total:%d", i, resNum);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
tFreeSExplainRsp(&localRsp->rsp);
|
tFreeSExplainRsp(&localRsp->rsp);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1076,6 +1134,9 @@ int32_t schLaunchLocalTask(SSchJob *pJob, SSchTask *pTask) {
|
||||||
|
|
||||||
if (SCH_IS_EXPLAIN_JOB(pJob)) {
|
if (SCH_IS_EXPLAIN_JOB(pJob)) {
|
||||||
explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
|
explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
|
||||||
|
if (NULL == explainRes) {
|
||||||
|
SCH_ERR_RET(terrno);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
|
SCH_ERR_JRET(qWorkerProcessLocalQuery(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
|
||||||
|
@ -1097,7 +1158,9 @@ _return:
|
||||||
|
|
||||||
int32_t schLaunchTaskImpl(void *param) {
|
int32_t schLaunchTaskImpl(void *param) {
|
||||||
SSchTaskCtx *pCtx = (SSchTaskCtx *)param;
|
SSchTaskCtx *pCtx = (SSchTaskCtx *)param;
|
||||||
SSchJob *pJob = schAcquireJob(pCtx->jobRid);
|
SSchJob *pJob = NULL;
|
||||||
|
|
||||||
|
(void)schAcquireJob(pCtx->jobRid, &pJob);
|
||||||
if (NULL == pJob) {
|
if (NULL == pJob) {
|
||||||
qDebug("job refId 0x%" PRIx64 " already not exist", pCtx->jobRid);
|
qDebug("job refId 0x%" PRIx64 " already not exist", pCtx->jobRid);
|
||||||
taosMemoryFree(param);
|
taosMemoryFree(param);
|
||||||
|
@ -1113,7 +1176,7 @@ int32_t schLaunchTaskImpl(void *param) {
|
||||||
int8_t status = 0;
|
int8_t status = 0;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
|
(void)atomic_add_fetch_32(&pTask->level->taskLaunchedNum, 1);
|
||||||
pTask->execId++;
|
pTask->execId++;
|
||||||
pTask->retryTimes++;
|
pTask->retryTimes++;
|
||||||
pTask->waitRetry = false;
|
pTask->waitRetry = false;
|
||||||
|
@ -1160,7 +1223,7 @@ _return:
|
||||||
SCH_UNLOCK_TASK(pTask);
|
SCH_UNLOCK_TASK(pTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
schReleaseJob(pJob->refId);
|
(void)schReleaseJob(pJob->refId);
|
||||||
|
|
||||||
taosMemoryFree(param);
|
taosMemoryFree(param);
|
||||||
|
|
||||||
|
@ -1178,7 +1241,7 @@ int32_t schAsyncLaunchTaskImpl(SSchJob *pJob, SSchTask *pTask) {
|
||||||
|
|
||||||
if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
|
if (pJob->taskNum >= SCH_MIN_AYSNC_EXEC_NUM) {
|
||||||
param->asyncLaunch = true;
|
param->asyncLaunch = true;
|
||||||
taosAsyncExec(schLaunchTaskImpl, param, NULL);
|
SCH_ERR_RET(taosAsyncExec(schLaunchTaskImpl, param, NULL));
|
||||||
} else {
|
} else {
|
||||||
SCH_ERR_RET(schLaunchTaskImpl(param));
|
SCH_ERR_RET(schLaunchTaskImpl(param));
|
||||||
}
|
}
|
||||||
|
@ -1252,7 +1315,7 @@ int32_t schDelayLaunchTask(SSchJob *pJob, SSchTask *pTask) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer, &pTask->delayTimer);
|
(void)taosTmrReset(schHandleTimerEvent, pTask->delayExecMs, (void *)param, schMgmt.timer, &pTask->delayTimer);
|
||||||
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
@ -1283,7 +1346,7 @@ void schDropTaskInHashList(SSchJob *pJob, SHashObj *list) {
|
||||||
|
|
||||||
SCH_LOCK_TASK(pTask);
|
SCH_LOCK_TASK(pTask);
|
||||||
if (pTask->delayTimer) {
|
if (pTask->delayTimer) {
|
||||||
taosTmrStopA(&pTask->delayTimer);
|
(void)taosTmrStopA(&pTask->delayTimer);
|
||||||
}
|
}
|
||||||
schDropTaskOnExecNode(pJob, pTask);
|
schDropTaskOnExecNode(pJob, pTask);
|
||||||
SCH_UNLOCK_TASK(pTask);
|
SCH_UNLOCK_TASK(pTask);
|
||||||
|
@ -1327,6 +1390,9 @@ int32_t schExecLocalFetch(SSchJob *pJob, SSchTask *pTask) {
|
||||||
|
|
||||||
if (SCH_IS_EXPLAIN_JOB(pJob)) {
|
if (SCH_IS_EXPLAIN_JOB(pJob)) {
|
||||||
explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
|
explainRes = taosArrayInit(pJob->taskNum, sizeof(SExplainLocalRsp));
|
||||||
|
if (NULL == explainRes) {
|
||||||
|
SCH_ERR_RET(terrno);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
|
SCH_ERR_JRET(qWorkerProcessLocalFetch(schMgmt.queryMgmt, schMgmt.sId, pJob->queryId, pTask->taskId, pJob->refId,
|
||||||
|
|
|
@ -22,9 +22,14 @@
|
||||||
#include "tref.h"
|
#include "tref.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
|
|
||||||
FORCE_INLINE SSchJob *schAcquireJob(int64_t refId) {
|
FORCE_INLINE int32_t schAcquireJob(int64_t refId, SSchJob** ppJob) {
|
||||||
qDebug("sch acquire jobId:0x%" PRIx64, refId);
|
qDebug("sch acquire jobId:0x%" PRIx64, refId);
|
||||||
return (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId);
|
*ppJob = (SSchJob *)taosAcquireRef(schMgmt.jobRef, refId);
|
||||||
|
if (NULL == *ppJob) {
|
||||||
|
return terrno;
|
||||||
|
}
|
||||||
|
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
FORCE_INLINE int32_t schReleaseJob(int64_t refId) {
|
FORCE_INLINE int32_t schReleaseJob(int64_t refId) {
|
||||||
|
@ -36,15 +41,16 @@ FORCE_INLINE int32_t schReleaseJob(int64_t refId) {
|
||||||
return taosReleaseRef(schMgmt.jobRef, refId);
|
return taosReleaseRef(schMgmt.jobRef, refId);
|
||||||
}
|
}
|
||||||
|
|
||||||
char *schDumpEpSet(SEpSet *pEpSet) {
|
int32_t schDumpEpSet(SEpSet *pEpSet, char** ppRes) {
|
||||||
|
*ppRes = NULL;
|
||||||
if (NULL == pEpSet) {
|
if (NULL == pEpSet) {
|
||||||
return NULL;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t maxSize = 1024;
|
int32_t maxSize = 1024;
|
||||||
char *str = taosMemoryMalloc(maxSize);
|
char *str = taosMemoryMalloc(maxSize);
|
||||||
if (NULL == str) {
|
if (NULL == str) {
|
||||||
return NULL;
|
return terrno;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t n = 0;
|
int32_t n = 0;
|
||||||
|
@ -54,7 +60,8 @@ char *schDumpEpSet(SEpSet *pEpSet) {
|
||||||
n += snprintf(str + n, maxSize - n, "[%s:%d]", pEp->fqdn, pEp->port);
|
n += snprintf(str + n, maxSize - n, "[%s:%d]", pEp->fqdn, pEp->port);
|
||||||
}
|
}
|
||||||
|
|
||||||
return str;
|
*ppRes = str;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
char *schGetOpStr(SCH_OP_TYPE type) {
|
char *schGetOpStr(SCH_OP_TYPE type) {
|
||||||
|
@ -73,7 +80,7 @@ char *schGetOpStr(SCH_OP_TYPE type) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void schFreeHbTrans(SSchHbTrans *pTrans) {
|
void schFreeHbTrans(SSchHbTrans *pTrans) {
|
||||||
rpcReleaseHandle((void *)pTrans->trans.pHandleId, TAOS_CONN_CLIENT);
|
(void)rpcReleaseHandle((void *)pTrans->trans.pHandleId, TAOS_CONN_CLIENT);
|
||||||
|
|
||||||
schFreeRpcCtx(&pTrans->rpcCtx);
|
schFreeRpcCtx(&pTrans->rpcCtx);
|
||||||
}
|
}
|
||||||
|
@ -86,7 +93,7 @@ void schCleanClusterHb(void *pTrans) {
|
||||||
if (hb->trans.pTrans == pTrans) {
|
if (hb->trans.pTrans == pTrans) {
|
||||||
SQueryNodeEpId *pEpId = taosHashGetKey(hb, NULL);
|
SQueryNodeEpId *pEpId = taosHashGetKey(hb, NULL);
|
||||||
schFreeHbTrans(hb);
|
schFreeHbTrans(hb);
|
||||||
taosHashRemove(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId));
|
(void)taosHashRemove(schMgmt.hbConnections, pEpId, sizeof(SQueryNodeEpId));
|
||||||
}
|
}
|
||||||
|
|
||||||
hb = taosHashIterate(schMgmt.hbConnections, hb);
|
hb = taosHashIterate(schMgmt.hbConnections, hb);
|
||||||
|
@ -109,7 +116,7 @@ int32_t schRemoveHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *ep
|
||||||
int64_t taskNum = atomic_load_64(&hb->taskNum);
|
int64_t taskNum = atomic_load_64(&hb->taskNum);
|
||||||
if (taskNum <= 0) {
|
if (taskNum <= 0) {
|
||||||
schFreeHbTrans(hb);
|
schFreeHbTrans(hb);
|
||||||
taosHashRemove(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
|
(void)taosHashRemove(schMgmt.hbConnections, epId, sizeof(SQueryNodeEpId));
|
||||||
}
|
}
|
||||||
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
|
SCH_UNLOCK(SCH_WRITE, &schMgmt.hbLock);
|
||||||
|
|
||||||
|
@ -165,7 +172,7 @@ int32_t schRegisterHbConnection(SSchJob *pJob, SSchTask *pTask, SQueryNodeEpId *
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
atomic_add_fetch_64(&hb->taskNum, 1);
|
(void)atomic_add_fetch_64(&hb->taskNum, 1);
|
||||||
|
|
||||||
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
||||||
|
|
||||||
|
@ -178,12 +185,17 @@ void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
||||||
|
if (NULL == addr) {
|
||||||
|
SCH_TASK_ELOG("fail to get the %dth condidateAddr in task, totalNum:%d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
SQueryNodeEpId epId = {0};
|
SQueryNodeEpId epId = {0};
|
||||||
|
|
||||||
epId.nodeId = addr->nodeId;
|
epId.nodeId = addr->nodeId;
|
||||||
|
|
||||||
SEp *pEp = SCH_GET_CUR_EP(addr);
|
SEp *pEp = SCH_GET_CUR_EP(addr);
|
||||||
strcpy(epId.ep.fqdn, pEp->fqdn);
|
TAOS_STRCPY(epId.ep.fqdn, pEp->fqdn);
|
||||||
epId.ep.port = pEp->port;
|
epId.ep.port = pEp->port;
|
||||||
|
|
||||||
SCH_LOCK(SCH_READ, &schMgmt.hbLock);
|
SCH_LOCK(SCH_READ, &schMgmt.hbLock);
|
||||||
|
@ -197,7 +209,7 @@ void schDeregisterTaskHb(SSchJob *pJob, SSchTask *pTask) {
|
||||||
int64_t taskNum = atomic_sub_fetch_64(&hb->taskNum, 1);
|
int64_t taskNum = atomic_sub_fetch_64(&hb->taskNum, 1);
|
||||||
if (0 == taskNum) {
|
if (0 == taskNum) {
|
||||||
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
||||||
schRemoveHbConnection(pJob, pTask, &epId);
|
(void)schRemoveHbConnection(pJob, pTask, &epId);
|
||||||
} else {
|
} else {
|
||||||
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
||||||
}
|
}
|
||||||
|
@ -211,12 +223,17 @@ int32_t schEnsureHbConnection(SSchJob *pJob, SSchTask *pTask) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, pTask->candidateIdx);
|
||||||
|
if (NULL == addr) {
|
||||||
|
SCH_TASK_ELOG("fail to get the %dth condidateAddr in task, totalNum:%d", pTask->candidateIdx, (int32_t)taosArrayGetSize(pTask->candidateAddrs));
|
||||||
|
return TSDB_CODE_SCH_INTERNAL_ERROR;
|
||||||
|
}
|
||||||
|
|
||||||
SQueryNodeEpId epId = {0};
|
SQueryNodeEpId epId = {0};
|
||||||
|
|
||||||
epId.nodeId = addr->nodeId;
|
epId.nodeId = addr->nodeId;
|
||||||
|
|
||||||
SEp *pEp = SCH_GET_CUR_EP(addr);
|
SEp *pEp = SCH_GET_CUR_EP(addr);
|
||||||
strcpy(epId.ep.fqdn, pEp->fqdn);
|
TAOS_STRCPY(epId.ep.fqdn, pEp->fqdn);
|
||||||
epId.ep.port = pEp->port;
|
epId.ep.port = pEp->port;
|
||||||
|
|
||||||
SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId));
|
SCH_ERR_RET(schRegisterHbConnection(pJob, pTask, &epId));
|
||||||
|
@ -240,7 +257,7 @@ int32_t schUpdateHbConnection(SQueryNodeEpId *epId, SSchTrans *trans) {
|
||||||
}
|
}
|
||||||
|
|
||||||
SCH_LOCK(SCH_WRITE, &hb->lock);
|
SCH_LOCK(SCH_WRITE, &hb->lock);
|
||||||
memcpy(&hb->trans, trans, sizeof(*trans));
|
TAOS_MEMCPY(&hb->trans, trans, sizeof(*trans));
|
||||||
SCH_UNLOCK(SCH_WRITE, &hb->lock);
|
SCH_UNLOCK(SCH_WRITE, &hb->lock);
|
||||||
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
SCH_UNLOCK(SCH_READ, &schMgmt.hbLock);
|
||||||
|
|
||||||
|
@ -256,7 +273,7 @@ void schCloseJobRef(void) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (schMgmt.jobRef >= 0) {
|
if (schMgmt.jobRef >= 0) {
|
||||||
taosCloseRef(schMgmt.jobRef);
|
(void)taosCloseRef(schMgmt.jobRef);
|
||||||
schMgmt.jobRef = -1;
|
schMgmt.jobRef = -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -169,7 +169,8 @@ void schedulerFreeJob(int64_t *jobId, int32_t errCode) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
SSchJob *pJob = schAcquireJob(*jobId);
|
SSchJob *pJob = NULL;
|
||||||
|
(void)schAcquireJob(*jobId, &pJob);
|
||||||
if (NULL == pJob) {
|
if (NULL == pJob) {
|
||||||
qWarn("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *jobId);
|
qWarn("Acquire sch job failed, may be dropped, jobId:0x%" PRIx64, *jobId);
|
||||||
return;
|
return;
|
||||||
|
|
|
@ -431,8 +431,11 @@ void *schtSendRsp(void *param) {
|
||||||
taosMsleep(1);
|
taosMsleep(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
pJob = schAcquireJob(job);
|
code = schAcquireJob(job, &pJob);
|
||||||
|
if (code) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
void *pIter = taosHashIterate(pJob->execTasks, NULL);
|
void *pIter = taosHashIterate(pJob->execTasks, NULL);
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
SSchTask *task = *(SSchTask **)pIter;
|
SSchTask *task = *(SSchTask **)pIter;
|
||||||
|
@ -457,8 +460,13 @@ void *schtSendRsp(void *param) {
|
||||||
|
|
||||||
void *schtCreateFetchRspThread(void *param) {
|
void *schtCreateFetchRspThread(void *param) {
|
||||||
int64_t job = *(int64_t *)param;
|
int64_t job = *(int64_t *)param;
|
||||||
SSchJob *pJob = schAcquireJob(job);
|
SSchJob *pJob = NULL;
|
||||||
|
|
||||||
|
(void)schAcquireJob(job, &pJob);
|
||||||
|
if (NULL == pJob) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
taosSsleep(1);
|
taosSsleep(1);
|
||||||
|
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
@ -572,7 +580,9 @@ void *schtRunJobThread(void *aa) {
|
||||||
code = schedulerExecJob(&req, &queryJobRefId);
|
code = schedulerExecJob(&req, &queryJobRefId);
|
||||||
assert(code == 0);
|
assert(code == 0);
|
||||||
|
|
||||||
pJob = schAcquireJob(queryJobRefId);
|
pJob = NULL;
|
||||||
|
code = schAcquireJob(queryJobRefId, &pJob);
|
||||||
|
|
||||||
if (NULL == pJob) {
|
if (NULL == pJob) {
|
||||||
taosArrayDestroy(qnodeList);
|
taosArrayDestroy(qnodeList);
|
||||||
schtFreeQueryDag(dag);
|
schtFreeQueryDag(dag);
|
||||||
|
@ -728,7 +738,9 @@ TEST(queryTest, normalCase) {
|
||||||
code = schedulerExecJob(&req, &job);
|
code = schedulerExecJob(&req, &job);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
SSchJob *pJob = schAcquireJob(job);
|
SSchJob *pJob = NULL;
|
||||||
|
code = schAcquireJob(job, &pJob);
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
void *pIter = taosHashIterate(pJob->execTasks, NULL);
|
void *pIter = taosHashIterate(pJob->execTasks, NULL);
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
|
@ -839,8 +851,10 @@ TEST(queryTest, readyFirstCase) {
|
||||||
code = schedulerExecJob(&req, &job);
|
code = schedulerExecJob(&req, &job);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
SSchJob *pJob = schAcquireJob(job);
|
SSchJob *pJob = NULL;
|
||||||
|
code = schAcquireJob(job, &pJob);
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
void *pIter = taosHashIterate(pJob->execTasks, NULL);
|
void *pIter = taosHashIterate(pJob->execTasks, NULL);
|
||||||
while (pIter) {
|
while (pIter) {
|
||||||
SSchTask *task = *(SSchTask **)pIter;
|
SSchTask *task = *(SSchTask **)pIter;
|
||||||
|
@ -956,7 +970,9 @@ TEST(queryTest, flowCtrlCase) {
|
||||||
code = schedulerExecJob(&req, &job);
|
code = schedulerExecJob(&req, &job);
|
||||||
ASSERT_EQ(code, 0);
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
SSchJob *pJob = schAcquireJob(job);
|
SSchJob *pJob = NULL;
|
||||||
|
code = schAcquireJob(job, &pJob);
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
while (!queryDone) {
|
while (!queryDone) {
|
||||||
void *pIter = taosHashIterate(pJob->execTasks, NULL);
|
void *pIter = taosHashIterate(pJob->execTasks, NULL);
|
||||||
|
@ -1094,7 +1110,7 @@ TEST(otherTest, otherCase) {
|
||||||
schReleaseJob(0);
|
schReleaseJob(0);
|
||||||
schFreeRpcCtx(NULL);
|
schFreeRpcCtx(NULL);
|
||||||
|
|
||||||
ASSERT_EQ(schDumpEpSet(NULL), (char *)NULL);
|
ASSERT_EQ(schDumpEpSet(NULL, NULL), TSDB_CODE_SUCCESS);
|
||||||
ASSERT_EQ(strcmp(schGetOpStr(SCH_OP_NULL), "NULL"), 0);
|
ASSERT_EQ(strcmp(schGetOpStr(SCH_OP_NULL), "NULL"), 0);
|
||||||
ASSERT_EQ(strcmp(schGetOpStr((SCH_OP_TYPE)100), "UNKNOWN"), 0);
|
ASSERT_EQ(strcmp(schGetOpStr((SCH_OP_TYPE)100), "UNKNOWN"), 0);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue