From 5ed3bb079ceeca30c65aac79feed881a03e9f656 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 20 Dec 2021 14:31:04 +0800 Subject: [PATCH] feature/scheduler --- include/libs/scheduler/scheduler.h | 4 +- source/libs/scheduler/inc/schedulerInt.h | 5 +- source/libs/scheduler/src/scheduler.c | 47 ++++++++++++------- source/libs/scheduler/test/schedulerTests.cpp | 4 +- 4 files changed, 37 insertions(+), 23 deletions(-) diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 35daea3ebd..2cbf26f877 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -55,10 +55,10 @@ int32_t schedulerInit(SSchedulerCfg *cfg); /** * Process the query job, generated according to the query physical plan. * This is a synchronized API, and is also thread-safety. - * @param pJob + * @param qnodeList Qnode address list, element is SEpAddr * @return */ -int32_t scheduleExecJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob); +int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob); int32_t scheduleFetchRows(void *pJob, void **data); diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index db75fc4fdd..73e7c4d24e 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -73,9 +73,8 @@ typedef struct SQueryJob { SQueryProfileSummary summary; SEpSet dataSrcEps; SEpAddr resEp; - struct SCatalog *catalog; - void *rpc; - SEpSet *mgmtEpSet; + void *transport; + SArray *qnodeList; tsem_t rspSem; int32_t userFetch; int32_t remoteFetch; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index a20b4a7ee1..6014ff9ab6 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -231,20 +231,27 @@ _return: SCH_RET(code); } -int32_t schAvailableEpSet(SQueryJob *job, SEpSet *epSet) { +int32_t schSetTaskExecEpSet(SQueryJob *job, SEpSet *epSet) { if (epSet->numOfEps >= SCH_MAX_CONDIDATE_EP_NUM) { return TSDB_CODE_SUCCESS; } - if (SCH_HAS_QNODE_IN_CLUSTER(schMgmt.cfg.clusterType)) { - SCH_ERR_RET(catalogGetQnodeList(job->catalog, job->rpc, job->mgmtEpSet, epSet)); - } else { - for (int32_t i = 0; i < job->dataSrcEps.numOfEps; ++i) { - strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i])); - epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i]; - - ++epSet->numOfEps; - } + int32_t qnodeNum = taosArrayGetSize(job->qnodeList); + + for (int32_t i = 0; i < qnodeNum && epSet->numOfEps < tListLen(epSet->port); ++i) { + SEpAddr *addr = taosArrayGet(job->qnodeList, i); + + strncpy(epSet->fqdn[epSet->numOfEps], addr->fqdn, sizeof(addr->fqdn)); + epSet->port[epSet->numOfEps] = addr->port; + + ++epSet->numOfEps; + } + + for (int32_t i = 0; i < job->dataSrcEps.numOfEps && epSet->numOfEps < tListLen(epSet->port); ++i) { + strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i])); + epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i]; + + ++epSet->numOfEps; } return TSDB_CODE_SUCCESS; @@ -515,7 +522,12 @@ int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) { SCH_ERR_RET(qSubPlanToString(plan, &task->msg)); if (plan->execEpSet.numOfEps <= 0) { - SCH_ERR_RET(schAvailableEpSet(job, &plan->execEpSet)); + SCH_ERR_RET(schSetTaskExecEpSet(job, &plan->execEpSet)); + } + + if (plan->execEpSet.numOfEps <= 0) { + SCH_TASK_ERR_LOG("invalid execEpSet num:%d", plan->execEpSet.numOfEps); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } SCH_ERR_RET(schAsyncSendMsg(job, task, TSDB_MSG_TYPE_QUERY)); @@ -554,20 +566,23 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { } -int32_t scheduleExecJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob) { - if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { +int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob) { + if (NULL == transport || NULL == transport ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } + if (taosArrayGetSize(qnodeList) <= 0) { + qInfo("qnodeList is empty"); + } + int32_t code = 0; SQueryJob *job = calloc(1, sizeof(SQueryJob)); if (NULL == job) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - job->catalog = pCatalog; - job->rpc = pRpc; - job->mgmtEpSet = (SEpSet *)pMgmtEps; + job->transport = transport; + job->qnodeList = qnodeList; SCH_ERR_JRET(schValidateAndBuildJob(pDag, job)); diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 8869271dc2..9e94553058 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -79,17 +79,17 @@ TEST(testCase, normalCase) { char *clusterId = "cluster1"; char *dbname = "1.db1"; char *tablename = "table1"; - struct SCatalog* pCtg = (struct SCatalog*)mockPointer; SVgroupInfo vgInfo = {0}; void *pJob = NULL; SQueryDag dag = {0}; + SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr)); int32_t code = schedulerInit(NULL); ASSERT_EQ(code, 0); mockBuildDag(&dag); - code = scheduleExecJob(pCtg, mockPointer, (const SEpSet*)mockPointer, &dag, &pJob); + code = scheduleExecJob(mockPointer, qnodeList, &dag, &pJob); ASSERT_EQ(code, 0); }