diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 3f4d21a746..8b54b88b28 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -141,8 +141,8 @@ int32_t qCreateQueryDag(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* // Set datasource of this subplan, multiple calls may be made to a subplan. // @subplan subplan to be schedule // @templateId templateId of a group of datasource subplans of this @subplan -// @eps Execution location of this group of datasource subplans, is an array of SEpAddr structures -int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SArray* eps); +// @ep one execution location of this group of datasource subplans +int32_t qSetSubplanExecutionNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep); int32_t qExplainQuery(const struct SQueryStmtInfo* pQueryInfo, struct SEpSet* pQnode, char** str); diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 47731dde77..1134c7763a 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -639,8 +639,6 @@ int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } - pQnodeEpSet->inUse = 0; - pQnodeEpSet->numOfEps = 0; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index c5ab74a85f..368f01e5ac 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -29,7 +29,7 @@ extern "C" { #define SCHEDULE_DEFAULT_JOB_NUMBER 1000 #define SCHEDULE_DEFAULT_TASK_NUMBER 1000 -#define SCHEDULE_MAX_CONDIDATE_EP_NUM 3 +#define SCH_MAX_CONDIDATE_EP_NUM TSDB_MAX_REPLICA enum { SCH_STATUS_NOT_START = 1, @@ -54,7 +54,6 @@ typedef struct SQueryTask { SEpAddr execAddr; // task actual executed node address SQueryProfileSummary summary; // task execution summary int32_t childReady; // child task ready number - SArray *childSrcEp; // child Eps, element is SEpAddr SArray *childern; // the datasource tasks,from which to fetch the result, element is SQueryTask* SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* } SQueryTask; @@ -73,9 +72,13 @@ typedef struct SQueryJob { int8_t status; SQueryProfileSummary summary; SEpSet dataSrcEps; + SEpAddr resEp; struct SCatalog *catalog; void *rpc; SEpSet *mgmtEpSet; + tsem_t rspSem; + int32_t userFetch; + void *res; SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask* SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask* diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 396930dc55..a7676b6c76 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -224,14 +224,21 @@ _return: SCH_RET(code); } -int32_t schAvailableEpSet(SQueryJob *job, SEpSet *epSet) { - SCH_ERR_RET(catalogGetQnodeList(job->catalog, job->rpc, job->mgmtEpSet, epSet)); - - if (epSet->numOfEps > SCHEDULE_MAX_CONDIDATE_EP_NUM) { +int32_t schAvailableEpSet(SQueryJob *job, SEpSet *epSet) { + if (epSet->numOfEps >= SCH_MAX_CONDIDATE_EP_NUM) { return TSDB_CODE_SUCCESS; } - //TODO COPY dataSrcEps TO epSet + if (SCH_HAS_QNODE_IN_CLUSTER(schMgmt.cfg.clusterType)) { + SCH_ERR_RET(catalogGetQnodeList(job->catalog, job->rpc, job->mgmtEpSet, epSet)); + } else { + for (int32_t i = 0; i < job->dataSrcEps.numOfEps; ++i) { + strncpy(epSet->fqdn[epSet->numOfEps], &job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn)); + epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i]; + + ++epSet->numOfEps; + } + } return TSDB_CODE_SUCCESS; } @@ -245,6 +252,10 @@ int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCod } +int32_t schFetchFromRemote(SQueryJob *job) { + +} + int32_t schProcessOnJobSuccess(SQueryJob *job) { @@ -271,12 +282,15 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } + strncpy(job->resEp.fqdn, task->execAddr.fqdn, sizeof(job->resEp.fqdn)); + job->resEp.port = task->execAddr.port; + SCH_ERR_RET(schProcessOnJobSuccess()); return TSDB_CODE_SUCCESS; } - if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCHEDULE_MAX_CONDIDATE_EP_NUM) { + if (SCH_IS_DATA_SRC_TASK(task) && job->dataSrcEps.numOfEps < SCH_MAX_CONDIDATE_EP_NUM) { strncpy(job->dataSrcEps.fqdn[job->dataSrcEps.numOfEps], task->execAddr.fqdn, sizeof(task->execAddr.fqdn)); job->dataSrcEps.port[job->dataSrcEps.numOfEps] = task->execAddr.port; @@ -288,10 +302,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { ++par->childReady; - if (NULL == taosArrayPush(par->childSrcEp, &task->execAddr)) { - qError("taosArrayPush failed"); - SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); - } + SCH_ERR_RET(qSetSubplanExecutionNode(par->plan, task->plan->id.templateId, &task->execAddr)); if (SCH_TASK_READY_TO_LUNCH(par)) { SCH_ERR_RET(schTaskRun(job, task)); @@ -414,6 +425,8 @@ int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pM qError("taosHashInit %d failed", pDag->numOfSubplans); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + + tsem_init(&job->rspSem, 0, 0); SCH_ERR_JRET(schJobRun(job)); @@ -429,7 +442,26 @@ _return: SCH_RET(code); } -int32_t scheduleFetchRows(void *pRpc, void *pJob, void *data); +int32_t scheduleFetchRows(void *pRpc, void *pJob, void **data) { + if (NULL == pRpc || NULL == pJob || NULL == data) { + return TSDB_CODE_QRY_INVALID_INPUT; + } + + SQueryJob *job = pJob; + + if (atomic_val_compare_exchange_32(&job->userFetch, 0, 1) != 0) { + qError("prior fetching not finished"); + return TSDB_CODE_QRY_APP_ERROR; + } + + SCH_ERR_RET(schFetchFromRemote(job)); + + tsem_wait(&job->rspSem); + + *data = job->res; + + return TSDB_CODE_SUCCESS; +} int32_t scheduleCancelJob(void *pRpc, void *pJob);