From 473ac84f2f4d608559f2830c9946bc323344417a Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Tue, 5 Jul 2022 18:12:01 +0800 Subject: [PATCH] fix: fix compile errors --- include/libs/qcom/query.h | 7 +++ include/libs/scheduler/scheduler.h | 15 ++---- source/client/src/clientImpl.c | 20 ++++---- source/client/src/clientMain.c | 2 +- source/libs/scheduler/inc/schInt.h | 40 ++++++++++----- source/libs/scheduler/src/schDbg.c | 6 +-- source/libs/scheduler/src/schFlowCtrl.c | 2 +- source/libs/scheduler/src/schJob.c | 49 +++++++++++-------- source/libs/scheduler/src/schRemote.c | 6 +-- source/libs/scheduler/src/schStatus.c | 16 ++++-- source/libs/scheduler/src/schTask.c | 6 +-- source/libs/scheduler/src/schUtil.c | 2 +- source/libs/scheduler/src/scheduler.c | 4 +- source/libs/scheduler/test/schedulerTests.cpp | 34 ++++++++----- 14 files changed, 125 insertions(+), 84 deletions(-) diff --git a/include/libs/qcom/query.h b/include/libs/qcom/query.h index 617b50aacc..a93cf1f9b8 100644 --- a/include/libs/qcom/query.h +++ b/include/libs/qcom/query.h @@ -68,6 +68,13 @@ typedef struct SIndexMeta { } SIndexMeta; +typedef struct SExecResult { + int32_t code; + uint64_t numOfRows; + int32_t msgType; + void* res; +} SExecResult; + typedef struct STbVerInfo { char tbFName[TSDB_TABLE_FNAME_LEN]; int32_t sversion; diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index ae4cbb498c..70ac7a6304 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -53,13 +53,6 @@ typedef struct SQueryProfileSummary { uint64_t resultSize; // generated result size in Kb. } SQueryProfileSummary; -typedef struct SExecResult { - int32_t code; - uint64_t numOfRows; - int32_t msgType; - void* res; -} SExecResult; - typedef struct STaskInfo { SQueryNodeAddr addr; SSubQueryMsg *msg; @@ -70,7 +63,7 @@ typedef struct SSchdFetchParam { int32_t* code; } SSchdFetchParam; -typedef void (*schedulerExecFp)(SQueryResult* pResult, void* param, int32_t code); +typedef void (*schedulerExecFp)(SExecResult* pResult, void* param, int32_t code); typedef void (*schedulerFetchFp)(void* pResult, void* param, int32_t code); typedef bool (*schedulerChkKillFp)(void* param); @@ -87,7 +80,7 @@ typedef struct SSchedulerReq { schedulerChkKillFp chkKillFp; void* chkKillParam; SExecResult* pExecRes; - char** pFetchRes; + void** pFetchRes; } SSchedulerReq; @@ -95,7 +88,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg); int32_t schedulerExecJob(SSchedulerReq *pReq, int64_t *pJob); -int32_t schedulerFetchRows(int64_t job, void **data); +int32_t schedulerFetchRows(int64_t jobId, SSchedulerReq *pReq); void schedulerFetchRowsA(int64_t job, schedulerFetchFp fp, void* param); @@ -119,7 +112,7 @@ void schedulerFreeJob(int64_t* job, int32_t errCode); void schedulerDestroy(void); -void schdExecCallback(SQueryResult* pResult, void* param, int32_t code); +void schdExecCallback(SExecResult* pResult, void* param, int32_t code); #ifdef __cplusplus } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 3d43b3a9a1..542801954f 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -628,7 +628,7 @@ _return: int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList) { void* pTransporter = pRequest->pTscObj->pAppInfo->pTransporter; - SQueryResult res = {0}; + SExecResult res = {0}; SRequestConnInfo conn = {.pTrans = pRequest->pTscObj->pAppInfo->pTransporter, .requestId = pRequest->requestId, .requestObjRefId = pRequest->self}; @@ -640,14 +640,14 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryPlan* pDag, SArray* pNodeList .sql = pRequest->sqlstr, .startTs = pRequest->metric.start, .execFp = NULL, - .execParam = NULL, + .cbParam = NULL, .chkKillFp = chkRequestKilled, - .chkKillParam = (void*)pRequest->self - .pQueryRes = &res, + .chkKillParam = (void*)pRequest->self, + .pExecRes = &res, }; int32_t code = schedulerExecJob(&req, &pRequest->body.queryJob); - pRequest->body.resInfo.execRes = res.res; + memcpy(&pRequest->body.resInfo.execRes, &res, sizeof(res)); if (code != TSDB_CODE_SUCCESS) { schedulerFreeJob(&pRequest->body.queryJob, 0); @@ -784,10 +784,10 @@ int32_t handleQueryExecRsp(SRequestObj* pRequest) { return code; } -void schedulerExecCb(SQueryResult* pResult, void* param, int32_t code) { +void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { SRequestObj* pRequest = (SRequestObj*)param; pRequest->code = code; - pRequest->body.resInfo.execRes = pResult->res; + memcpy(&pRequest->body.resInfo.execRes, pResult, sizeof(*pResult)); if (TDMT_VND_SUBMIT == pRequest->type || TDMT_VND_DELETE == pRequest->type || TDMT_VND_CREATE_TABLE == pRequest->type) { @@ -952,10 +952,10 @@ void launchAsyncQuery(SRequestObj* pRequest, SQuery* pQuery, SMetaData* pResultM .sql = pRequest->sqlstr, .startTs = pRequest->metric.start, .execFp = schedulerExecCb, - .execParam = pRequest, + .cbParam = pRequest, .chkKillFp = chkRequestKilled, .chkKillParam = (void*)pRequest->self, - .pQueryRes = NULL, + .pExecRes = NULL, }; code = schedulerExecJob(&req, &pRequest->body.queryJob); taosArrayDestroy(pNodeList); @@ -1398,7 +1398,7 @@ void* doFetchRows(SRequestObj* pRequest, bool setupOneRowPtr, bool convertUcs4) SReqResultInfo* pResInfo = &pRequest->body.resInfo; SSchedulerReq req = { .syncReq = true, - .pFetchRes = &pResInfo->pData, + .pFetchRes = (void**)&pResInfo->pData, }; pRequest->code = schedulerFetchRows(pRequest->body.queryJob, &req); if (pRequest->code != TSDB_CODE_SUCCESS) { diff --git a/source/client/src/clientMain.c b/source/client/src/clientMain.c index 2550a7a47b..1267b3ee0c 100644 --- a/source/client/src/clientMain.c +++ b/source/client/src/clientMain.c @@ -862,7 +862,7 @@ void taos_fetch_rows_a(TAOS_RES *res, __taos_async_fn_t fp, void *param) { SSchedulerReq req = { .syncReq = false, .fetchFp = fetchCallback, - .execParam = pRequest, + .cbParam = pRequest, }; schedulerFetchRows(pRequest->body.queryJob, &req); } diff --git a/source/libs/scheduler/inc/schInt.h b/source/libs/scheduler/inc/schInt.h index 819d51c4e7..ae120a42be 100644 --- a/source/libs/scheduler/inc/schInt.h +++ b/source/libs/scheduler/inc/schInt.h @@ -98,7 +98,7 @@ typedef struct SSchStat { } SSchStat; typedef struct SSchResInfo { - SQueryResult* queryRes; + SExecResult* execRes; void** fetchRes; schedulerExecFp execFp; schedulerFetchFp fetchFp; @@ -111,11 +111,6 @@ typedef struct SSchOpEvent { SSchedulerReq *pReq; } SSchOpEvent; -typedef struct SSchEvent { - SCH_EVENT_TYPE event; - void* info; -} SSchEvent; - typedef int32_t (*schStatusEnterFp)(void* pHandle, void* pParam); typedef int32_t (*schStatusLeaveFp)(void* pHandle, void* pParam); typedef int32_t (*schStatusEventFp)(void* pHandle, void* pParam, void* pEvent); @@ -315,9 +310,9 @@ extern SSchedulerMgmt schMgmt; #define SCH_GET_JOB_STATUS(job) atomic_load_8(&(job)->status) #define SCH_GET_JOB_STATUS_STR(job) jobTaskStatusStr(SCH_GET_JOB_STATUS(job)) -#define SCH_JOB_IN_SYNC_OP(job) ((job)->opStatus.op && (job)->opStatus.sync) -#define SCH_JOB_IN_ASYNC_EXEC_OP(job) (((job)->opStatus.op == SCH_OP_EXEC) && (!(job)->opStatus.sync)) -#define SCH_JOB_IN_ASYNC_FETCH_OP(job) (((job)->opStatus.op == SCH_OP_FETCH) && (!(job)->opStatus.sync)) +#define SCH_JOB_IN_SYNC_OP(job) ((job)->opStatus.op && (job)->opStatus.syncReq) +#define SCH_JOB_IN_ASYNC_EXEC_OP(job) ((SCH_OP_EXEC == atomic_val_compare_exchange_32(&(job)->opStatus.op, SCH_OP_EXEC, SCH_OP_NULL)) && (!(job)->opStatus.syncReq)) +#define SCH_JOB_IN_ASYNC_FETCH_OP(job) ((SCH_OP_FETCH == atomic_val_compare_exchange_32(&(job)->opStatus.op, SCH_OP_FETCH, SCH_OP_NULL)) && (!(job)->opStatus.syncReq)) #define SCH_SET_JOB_NEED_FLOW_CTRL(_job) (_job)->attr.needFlowCtrl = true #define SCH_JOB_NEED_FLOW_CTRL(_job) ((_job)->attr.needFlowCtrl) @@ -355,7 +350,7 @@ extern SSchedulerMgmt schMgmt; #define SCH_SET_ERRNO(_err) do { if (TSDB_CODE_SCH_IGNORE_ERROR != (_err)) { terrno = (_err); } } while (0) #define SCH_ERR_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); return _code; } } while (0) #define SCH_RET(c) do { int32_t _code = c; if (_code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); } return _code; } while (0) -#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(_code); goto _return; } } while (0) +#define SCH_ERR_JRET(c) do { code = c; if (code != TSDB_CODE_SUCCESS) { SCH_SET_ERRNO(code); goto _return; } } while (0) #define SCH_LOCK(type, _lock) (SCH_READ == (type) ? taosRLockLatch(_lock) : taosWLockLatch(_lock)) #define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock)) @@ -408,11 +403,32 @@ int32_t schProcessOnTaskStatusRsp(SQueryNodeEpId* pEpId, SArray* pStatusList); void schFreeSMsgSendInfo(SMsgSendInfo *msgSendInfo); char* schGetOpStr(SCH_OP_TYPE type); int32_t schBeginOperation(SSchJob *pJob, SCH_OP_TYPE type, bool sync); -int32_t schInitJob(SSchJob **pJob, SSchedulerReq *pReq); +int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq); int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq); -int32_t schDumpJobExecRes(SSchJob* pJob, SQueryResult* pRes); +int32_t schDumpJobExecRes(SSchJob* pJob, SExecResult* pRes); int32_t schUpdateTaskCandidateAddr(SSchJob *pJob, SSchTask *pTask, SEpSet* pEpSet); int32_t schHandleRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32_t rspCode); +void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode); +int32_t schProcessOnOpBegin(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq); +void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode); +int32_t schProcessOnCbBegin(SSchJob** job, SSchTask** task, uint64_t qId, int64_t rId, uint64_t tId); +void schDropTaskOnExecNode(SSchJob *pJob, SSchTask *pTask); +bool schJobDone(SSchJob *pJob); +int32_t schRemoveTaskFromExecList(SSchJob *pJob, SSchTask *pTask); +int32_t schLaunchJobLowerLevel(SSchJob *pJob, SSchTask *pTask); +int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param); +int32_t schHandleOpBeginEvent(int64_t jobId, SSchJob** job, SCH_OP_TYPE type, SSchedulerReq* pReq); +int32_t schHandleOpEndEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode); +int32_t schHandleTaskRetry(SSchJob *pJob, SSchTask *pTask); +void schUpdateJobErrCode(SSchJob *pJob, int32_t errCode); +int32_t schTaskCheckSetRetry(SSchJob *pJob, SSchTask *pTask, int32_t errCode, bool *needRetry); +int32_t schProcessOnJobFailure(SSchJob *pJob, int32_t errCode); +int32_t schProcessOnJobPartialSuccess(SSchJob *pJob); +void schFreeTask(SSchJob *pJob, SSchTask *pTask); +void schDropTaskInHashList(SSchJob *pJob, SHashObj *list); +int32_t schLaunchLevelTasks(SSchJob *pJob, SSchLevel *level); +int32_t schGetTaskFromList(SHashObj *pTaskList, uint64_t taskId, SSchTask **pTask); +int32_t schInitTask(SSchJob *pJob, SSchTask *pTask, SSubplan *pPlan, SSchLevel *pLevel); #ifdef __cplusplus diff --git a/source/libs/scheduler/src/schDbg.c b/source/libs/scheduler/src/schDbg.c index 5c0c6fbb76..7f013b8f32 100644 --- a/source/libs/scheduler/src/schDbg.c +++ b/source/libs/scheduler/src/schDbg.c @@ -14,16 +14,16 @@ */ #include "query.h" -#include "schedulerInt.h" +#include "schInt.h" tsem_t schdRspSem; -void schdExecCallback(SQueryResult* pResult, void* param, int32_t code) { +void schdExecCallback(SExecResult* pResult, void* param, int32_t code) { if (code) { pResult->code = code; } - *(SQueryResult*)param = *pResult; + *(SExecResult*)param = *pResult; taosMemoryFree(pResult); diff --git a/source/libs/scheduler/src/schFlowCtrl.c b/source/libs/scheduler/src/schFlowCtrl.c index 85d205f5f2..6b34a394b6 100644 --- a/source/libs/scheduler/src/schFlowCtrl.c +++ b/source/libs/scheduler/src/schFlowCtrl.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "schedulerInt.h" +#include "schInt.h" #include "tmsg.h" #include "query.h" #include "catalog.h" diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index d514ed2a9f..c4923b8740 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -16,7 +16,7 @@ #include "catalog.h" #include "command.h" #include "query.h" -#include "schedulerInt.h" +#include "schInt.h" #include "tmsg.h" #include "tref.h" #include "trpc.h" @@ -72,6 +72,8 @@ FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) { schUpdateJobErrCode(pJob, TSDB_CODE_TSC_QUERY_KILLED); return true; } + + return false; } int32_t schUpdateJobStatus(SSchJob *pJob, int8_t newStatus) { @@ -369,7 +371,7 @@ _return: int32_t schDumpJobExecRes(SSchJob* pJob, SExecResult* pRes) { pRes->code = atomic_load_32(&pJob->errCode); pRes->numOfRows = pJob->resNumOfRows; - pRes->res = pJob->execRes; + memcpy(pRes, &pJob->execRes, sizeof(pJob->execRes)); pJob->execRes.res = NULL; return TSDB_CODE_SUCCESS; @@ -406,15 +408,13 @@ int32_t schDumpJobFetchRes(SSchJob* pJob, void** pData) { } int32_t schNotifyUserExecRes(SSchJob* pJob) { - SQueryResult* pRes = taosMemoryCalloc(1, sizeof(SQueryResult)); + SExecResult* pRes = taosMemoryCalloc(1, sizeof(SExecResult)); if (pRes) { schDumpJobExecRes(pJob, pRes); } - schEndOperation(pJob); - SCH_JOB_DLOG("sch start to invoke exec cb, code: %s", tstrerror(pJob->errCode)); - (*pJob->userRes.execFp)(pRes, pJob->userRes.userParam, atomic_load_32(&pJob->errCode)); + (*pJob->userRes.execFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode)); SCH_JOB_DLOG("sch end from exec cb, code: %s", tstrerror(pJob->errCode)); return TSDB_CODE_SUCCESS; @@ -425,10 +425,8 @@ int32_t schNotifyUserFetchRes(SSchJob* pJob) { schDumpJobFetchRes(pJob, &pRes); - schEndOperation(pJob); - SCH_JOB_DLOG("sch start to invoke fetch cb, code: %s", tstrerror(pJob->errCode)); - (*pJob->userRes.fetchFp)(pRes, pJob->userRes.userParam, atomic_load_32(&pJob->errCode)); + (*pJob->userRes.fetchFp)(pRes, pJob->userRes.cbParam, atomic_load_32(&pJob->errCode)); SCH_JOB_DLOG("sch end from fetch cb, code: %s", tstrerror(pJob->errCode)); return TSDB_CODE_SUCCESS; @@ -627,7 +625,7 @@ void schFreeJobImpl(void *job) { qDestroyQueryPlan(pJob->pDag); - taosMemoryFreeClear(pJob->userRes.queryRes); + taosMemoryFreeClear(pJob->userRes.execRes); taosMemoryFreeClear(pJob->resData); taosMemoryFree(pJob); @@ -648,10 +646,14 @@ int32_t schJobFetchRows(SSchJob *pJob) { if (pJob->opStatus.syncReq) { SCH_JOB_DLOG("sync wait for rsp now, job status:%s", SCH_GET_JOB_STATUS_STR(pJob)); tsem_wait(&pJob->rspSem); - schPostJobRes(pJob, SCH_OP_FETCH); + SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes)); } } else { - schPostJobRes(pJob, SCH_OP_FETCH); + if (pJob->opStatus.syncReq) { + SCH_RET(schDumpJobFetchRes(pJob, pJob->userRes.fetchRes)); + } else { + schPostJobRes(pJob, SCH_OP_FETCH); + } } SCH_RET(code); @@ -674,8 +676,6 @@ int32_t schInitJob(int64_t *pJobId, SSchedulerReq *pReq) { pJob->chkKillParam = pReq->chkKillParam; pJob->userRes.execFp = pReq->execFp; pJob->userRes.cbParam = pReq->cbParam; - pJob->opStatus.op = SCH_OP_EXEC; - pJob->opStatus.syncReq = pReq->syncReq; if (pReq->pNodeList == NULL || taosArrayGetSize(pReq->pNodeList) <= 0) { qDebug("QID:0x%" PRIx64 " input exec nodeList is empty", pReq->pDag->queryId); @@ -750,22 +750,27 @@ int32_t schExecJob(SSchJob *pJob, SSchedulerReq *pReq) { void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode) { + int32_t op = 0; + switch (type) { case SCH_OP_EXEC: - int32_t op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL); +/* + op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL); if (SCH_OP_NULL == op || op != type) { SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status)); } - - if (pReq) { +*/ + if (pReq && pReq->syncReq) { schDumpJobExecRes(pJob, pReq->pExecRes); } break; case SCH_OP_FETCH: - int32_t op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL); +/* + op = atomic_val_compare_exchange_32(&pJob->opStatus.op, type, SCH_OP_NULL); if (SCH_OP_NULL == op || op != type) { SCH_JOB_ELOG("job not in %s operation, op:%s, status:%s", schGetOpStr(type), schGetOpStr(op), jobTaskStatusStr(pJob->status)); } +*/ break; case SCH_OP_GET_STATUS: errCode = TSDB_CODE_SUCCESS; @@ -775,7 +780,7 @@ void schProcessOnOpEnd(SSchJob *pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int } if (errCode) { - schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, errCode); + schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, (void*)&errCode); } SCH_JOB_DLOG("job end %s operation with code %s", schGetOpStr(type), tstrerror(errCode)); @@ -846,7 +851,7 @@ void schProcessOnCbEnd(SSchJob *pJob, SSchTask *pTask, int32_t errCode) { } if (errCode) { - schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, errCode); + schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, (void*)&errCode); } if (pJob) { @@ -865,7 +870,6 @@ int32_t schProcessOnCbBegin(SSchJob** job, SSchTask** task, uint64_t qId, int64_ SCH_ERR_RET(TSDB_CODE_QRY_JOB_NOT_EXIST); } - int8_t status = 0; if (schJobNeedToStop(pJob, &status)) { SCH_TASK_ELOG("will not do further processing cause of job status %s", jobTaskStatusStr(status)); SCH_ERR_JRET(TSDB_CODE_SCH_IGNORE_ERROR); @@ -875,6 +879,9 @@ int32_t schProcessOnCbBegin(SSchJob** job, SSchTask** task, uint64_t qId, int64_ SCH_LOCK_TASK(pTask); + *job = pJob; + *task = pTask; + return TSDB_CODE_SUCCESS; _return: diff --git a/source/libs/scheduler/src/schRemote.c b/source/libs/scheduler/src/schRemote.c index 64368162e3..ab457847b9 100644 --- a/source/libs/scheduler/src/schRemote.c +++ b/source/libs/scheduler/src/schRemote.c @@ -16,7 +16,7 @@ #include "catalog.h" #include "command.h" #include "query.h" -#include "schedulerInt.h" +#include "schInt.h" #include "tmsg.h" #include "tref.h" #include "trpc.h" @@ -378,7 +378,7 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { SSchTask *pTask = NULL; SSchJob *pJob = NULL; - SCH_TASK_DLOG("begin to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle, tstrerror(rspCode)); + qDebug("begin to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle, tstrerror(rspCode)); SCH_ERR_RET(schProcessOnCbBegin(&pJob, &pTask, pParam->queryId, pParam->refId, pParam->taskId)); @@ -390,7 +390,7 @@ int32_t schHandleCallback(void *param, SDataBuf *pMsg, int32_t rspCode) { taosMemoryFreeClear(pMsg->pData); taosMemoryFreeClear(param); - SCH_TASK_DLOG("end to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle, tstrerror(rspCode)); + qDebug("end to handle rsp msg, type:%s, handle:%p, code:%s", TMSG_INFO(pMsg->msgType), pMsg->handle, tstrerror(rspCode)); SCH_RET(code); } diff --git a/source/libs/scheduler/src/schStatus.c b/source/libs/scheduler/src/schStatus.c index 55bc600eca..80137f1872 100644 --- a/source/libs/scheduler/src/schStatus.c +++ b/source/libs/scheduler/src/schStatus.c @@ -37,10 +37,10 @@ int32_t schSwitchJobStatus(SSchJob* pJob, int32_t status, void* param) { case JOB_TASK_STATUS_SUCC: break; case JOB_TASK_STATUS_FAIL: - SCH_RET(schProcessOnJobFailure(pJob, (int32_t)param)); + SCH_RET(schProcessOnJobFailure(pJob, (param ? *(int32_t*)param : 0))); break; case JOB_TASK_STATUS_DROP: - SCH_ERR_JRET(schProcessOnJobDropped(pJob, (int32_t)param)); + SCH_ERR_JRET(schProcessOnJobDropped(pJob, *(int32_t*)param)); if (taosRemoveRef(schMgmt.jobRef, pJob->refId)) { SCH_JOB_ELOG("remove job from job list failed, refId:0x%" PRIx64, pJob->refId); @@ -73,14 +73,22 @@ int32_t schHandleOpBeginEvent(int64_t jobId, SSchJob** job, SCH_OP_TYPE type, SS SCH_RET(schProcessOnOpBegin(pJob, type, pReq)); } -void schHandleOpEndEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode) { +int32_t schHandleOpEndEvent(SSchJob* pJob, SCH_OP_TYPE type, SSchedulerReq* pReq, int32_t errCode) { + int32_t code = errCode; + if (NULL == pJob) { - return; + SCH_RET(code); } schProcessOnOpEnd(pJob, type, pReq, errCode); + if (TSDB_CODE_SCH_IGNORE_ERROR == errCode) { + code = pJob->errCode; + } + schReleaseJob(pJob->refId); + + return code; } diff --git a/source/libs/scheduler/src/schTask.c b/source/libs/scheduler/src/schTask.c index 1f89b59137..4da8ed446b 100644 --- a/source/libs/scheduler/src/schTask.c +++ b/source/libs/scheduler/src/schTask.c @@ -16,7 +16,7 @@ #include "catalog.h" #include "command.h" #include "query.h" -#include "schedulerInt.h" +#include "schInt.h" #include "tmsg.h" #include "tref.h" #include "trpc.h" @@ -226,7 +226,7 @@ int32_t schProcessOnTaskSuccess(SSchJob *pJob, SSchTask *pTask) { } if (pTask->level->taskFailed > 0) { - SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, 0)); + SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, NULL)); } else { SCH_RET(schSwitchJobStatus(pJob, JOB_TASK_STATUS_PART_SUCC, NULL)); } @@ -294,7 +294,7 @@ int32_t schDoTaskRedirect(SSchJob *pJob, SSchTask *pTask, SDataBuf* pData, int32 if ((pTask->execId + 1) >= pTask->maxExecTimes) { SCH_TASK_DLOG("task no more retry since reach max try times, execId:%d", pTask->execId); - schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, (void*)rspCode); + schSwitchJobStatus(pJob, JOB_TASK_STATUS_FAIL, (void*)&rspCode); return TSDB_CODE_SUCCESS; } diff --git a/source/libs/scheduler/src/schUtil.c b/source/libs/scheduler/src/schUtil.c index 36a8475a34..f848dfa210 100644 --- a/source/libs/scheduler/src/schUtil.c +++ b/source/libs/scheduler/src/schUtil.c @@ -16,7 +16,7 @@ #include "catalog.h" #include "command.h" #include "query.h" -#include "schedulerInt.h" +#include "schInt.h" #include "tmsg.h" #include "tref.h" #include "trpc.h" diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 65ab9c7659..ebc4014e88 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -16,7 +16,7 @@ #include "catalog.h" #include "command.h" #include "query.h" -#include "schedulerInt.h" +#include "schInt.h" #include "tmsg.h" #include "tref.h" #include "trpc.h" @@ -148,7 +148,7 @@ void schedulerFreeJob(int64_t* jobId, int32_t errCode) { return; } - schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, (void*)errCode); + schSwitchJobStatus(pJob, JOB_TASK_STATUS_DROP, (void*)&errCode); *jobId = 0; } diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 1a464b78ab..d6b1baf978 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -50,7 +50,7 @@ #pragma GCC diagnostic ignored "-Wreturn-type" #pragma GCC diagnostic ignored "-Wformat" -#include "schedulerInt.h" +#include "schInt.h" #include "stub.h" #include "tref.h" @@ -87,7 +87,7 @@ void schtInitLogFile() { } -void schtQueryCb(SQueryResult* pResult, void* param, int32_t code) { +void schtQueryCb(SExecResult* pResult, void* param, int32_t code) { assert(TSDB_CODE_SUCCESS == code); *(int32_t*)param = 1; } @@ -585,7 +585,10 @@ void* schtRunJobThread(void *aa) { atomic_store_32(&schtStartFetch, 1); void *data = NULL; - code = schedulerFetchRows(queryJobRefId, &data); + req.syncReq = true; + req.pFetchRes = &data; + + code = schedulerFetchRows(queryJobRefId, &req); assert(code == 0 || code); if (0 == code) { @@ -595,7 +598,7 @@ void* schtRunJobThread(void *aa) { } data = NULL; - code = schedulerFetchRows(queryJobRefId, &data); + code = schedulerFetchRows(queryJobRefId, &req); assert(code == 0 || code); schtFreeQueryJob(0); @@ -710,7 +713,10 @@ TEST(queryTest, normalCase) { taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job); void *data = NULL; - code = schedulerFetchRows(job, &data); + req.syncReq = true; + req.pFetchRes = &data; + + code = schedulerFetchRows(job, &req); ASSERT_EQ(code, 0); SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data; @@ -719,7 +725,7 @@ TEST(queryTest, normalCase) { taosMemoryFreeClear(data); data = NULL; - code = schedulerFetchRows(job, &data); + code = schedulerFetchRows(job, &req); ASSERT_EQ(code, 0); ASSERT_TRUE(data == NULL); @@ -814,7 +820,9 @@ TEST(queryTest, readyFirstCase) { taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job); void *data = NULL; - code = schedulerFetchRows(job, &data); + req.syncReq = true; + req.pFetchRes = &data; + code = schedulerFetchRows(job, &req); ASSERT_EQ(code, 0); SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data; @@ -823,7 +831,7 @@ TEST(queryTest, readyFirstCase) { taosMemoryFreeClear(data); data = NULL; - code = schedulerFetchRows(job, &data); + code = schedulerFetchRows(job, &req); ASSERT_EQ(code, 0); ASSERT_TRUE(data == NULL); @@ -926,7 +934,9 @@ TEST(queryTest, flowCtrlCase) { taosThreadCreate(&(thread1), &thattr, schtCreateFetchRspThread, &job); void *data = NULL; - code = schedulerFetchRows(job, &data); + req.syncReq = true; + req.pFetchRes = &data; + code = schedulerFetchRows(job, &req); ASSERT_EQ(code, 0); SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data; @@ -935,7 +945,7 @@ TEST(queryTest, flowCtrlCase) { taosMemoryFreeClear(data); data = NULL; - code = schedulerFetchRows(job, &data); + code = schedulerFetchRows(job, &req); ASSERT_EQ(code, 0); ASSERT_TRUE(data == NULL); @@ -979,7 +989,7 @@ TEST(insertTest, normalCase) { TdThread thread1; taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId); - SQueryResult res = {0}; + SExecResult res = {0}; SRequestConnInfo conn = {0}; conn.pTrans = mockPointer; @@ -991,7 +1001,7 @@ TEST(insertTest, normalCase) { req.execFp = schtQueryCb; req.cbParam = NULL; - code = schedulerExecJob(&req, &insertJobRefId, &res); + code = schedulerExecJob(&req, &insertJobRefId); ASSERT_EQ(code, 0); ASSERT_EQ(res.numOfRows, 20);