diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index b98170f168..af8deff1a0 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -76,8 +76,6 @@ int32_t schedulerExecJob(SSchedulerReq* pReq, int64_t* pJob); int32_t schedulerFetchRows(int64_t jobId, SSchedulerReq* pReq); -void schedulerFetchRowsA(int64_t job, schedulerFetchFp fp, void* param); - int32_t schedulerGetTasksStatus(int64_t job, SArray* pSub); void schedulerStopQueryHb(void* pTrans); @@ -100,6 +98,8 @@ void schedulerFreeJob(int64_t* job, int32_t errCode); void schedulerDestroy(void); +int32_t schedulerValidatePlan(SQueryPlan* pPlan); + #ifdef __cplusplus } #endif diff --git a/source/libs/executor/test/queryPlanTests.cpp b/source/libs/executor/test/queryPlanTests.cpp index f03d347a78..176af60944 100755 --- a/source/libs/executor/test/queryPlanTests.cpp +++ b/source/libs/executor/test/queryPlanTests.cpp @@ -43,6 +43,7 @@ #include "querytask.h" #include "functionMgt.h" #include "ttime.h" +#include "scheduler.h" namespace { @@ -3091,12 +3092,11 @@ void qptExecPlan(SReadHandle* pReadHandle, SNode* pNode, SExecTaskInfo* pTaskInf dsDestroyDataSinker(handle); break; } - case QUERY_NODE_PHYSICAL_SUBPLAN: + case QUERY_NODE_PHYSICAL_SUBPLAN: { break; + } case QUERY_NODE_PHYSICAL_PLAN: { - SSchJob job = {0}; - qptCtx.result.code = schValidateAndBuildJob((SQueryPlan*)pNode, &job); - schFreeJobImpl(&job); + qptCtx.result.code = schedulerValidatePlan((SQueryPlan*)pNode); break; } case QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN: diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 1b6e086472..d1c970b9fe 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -794,9 +794,11 @@ void schFreeJobImpl(void *job) { } taosMemoryFree(pJob); - int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1); - if (jobNum == 0) { - schCloseJobRef(); + if (refId > 0) { + int32_t jobNum = atomic_sub_fetch_32(&schMgmt.jobNum, 1); + if (jobNum == 0) { + schCloseJobRef(); + } } qDebug("QID:0x%" PRIx64 " sch job freed, refId:0x%" PRIx64 ", pointer:%p", queryId, refId, pJob); @@ -917,7 +919,7 @@ _return: if (NULL == pJob) { qDestroyQueryPlan(pReq->pDag); - } else if (pJob->refId < 0) { + } else if (pJob->refId <= 0) { schFreeJobImpl(pJob); } else { code = taosRemoveRef(schMgmt.jobRef, pJob->refId); diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 091de5c048..01d3204dc6 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -224,3 +224,26 @@ void schedulerDestroy(void) { qWorkerDestroy(&schMgmt.queryMgmt); schMgmt.queryMgmt = NULL; } + +int32_t schedulerValidatePlan(SQueryPlan* pPlan) { + int32_t code = TSDB_CODE_SUCCESS; + SSchJob *pJob = taosMemoryCalloc(1, sizeof(SSchJob)); + if (NULL == pJob) { + qError("QID:0x%" PRIx64 " calloc %d failed", pPlan->queryId, (int32_t)sizeof(SSchJob)); + SCH_ERR_RET(terrno); + } + + SCH_ERR_JRET(schValidateAndBuildJob(pPlan, pJob)); + + if (SCH_IS_EXPLAIN_JOB(pJob)) { + SCH_ERR_JRET(qExecExplainBegin(pPlan, &pJob->explainCtx, 0)); + } + +_return: + + schFreeJobImpl(pJob); + + return code; +} + +