feature/scheduler

This commit is contained in:
dapan1121 2021-12-20 14:31:04 +08:00
parent 4e279a137d
commit 5ed3bb079c
4 changed files with 37 additions and 23 deletions

View File

@ -55,10 +55,10 @@ int32_t schedulerInit(SSchedulerCfg *cfg);
/** /**
* Process the query job, generated according to the query physical plan. * Process the query job, generated according to the query physical plan.
* This is a synchronized API, and is also thread-safety. * This is a synchronized API, and is also thread-safety.
* @param pJob * @param qnodeList Qnode address list, element is SEpAddr
* @return * @return
*/ */
int32_t scheduleExecJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob); int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob);
int32_t scheduleFetchRows(void *pJob, void **data); int32_t scheduleFetchRows(void *pJob, void **data);

View File

@ -73,9 +73,8 @@ typedef struct SQueryJob {
SQueryProfileSummary summary; SQueryProfileSummary summary;
SEpSet dataSrcEps; SEpSet dataSrcEps;
SEpAddr resEp; SEpAddr resEp;
struct SCatalog *catalog; void *transport;
void *rpc; SArray *qnodeList;
SEpSet *mgmtEpSet;
tsem_t rspSem; tsem_t rspSem;
int32_t userFetch; int32_t userFetch;
int32_t remoteFetch; int32_t remoteFetch;

View File

@ -231,20 +231,27 @@ _return:
SCH_RET(code); SCH_RET(code);
} }
int32_t schAvailableEpSet(SQueryJob *job, SEpSet *epSet) { int32_t schSetTaskExecEpSet(SQueryJob *job, SEpSet *epSet) {
if (epSet->numOfEps >= SCH_MAX_CONDIDATE_EP_NUM) { if (epSet->numOfEps >= SCH_MAX_CONDIDATE_EP_NUM) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (SCH_HAS_QNODE_IN_CLUSTER(schMgmt.cfg.clusterType)) { int32_t qnodeNum = taosArrayGetSize(job->qnodeList);
SCH_ERR_RET(catalogGetQnodeList(job->catalog, job->rpc, job->mgmtEpSet, epSet));
} else { for (int32_t i = 0; i < qnodeNum && epSet->numOfEps < tListLen(epSet->port); ++i) {
for (int32_t i = 0; i < job->dataSrcEps.numOfEps; ++i) { SEpAddr *addr = taosArrayGet(job->qnodeList, i);
strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i]; strncpy(epSet->fqdn[epSet->numOfEps], addr->fqdn, sizeof(addr->fqdn));
epSet->port[epSet->numOfEps] = addr->port;
++epSet->numOfEps;
} ++epSet->numOfEps;
}
for (int32_t i = 0; i < job->dataSrcEps.numOfEps && epSet->numOfEps < tListLen(epSet->port); ++i) {
strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i]));
epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i];
++epSet->numOfEps;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
@ -515,7 +522,12 @@ int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) {
SCH_ERR_RET(qSubPlanToString(plan, &task->msg)); SCH_ERR_RET(qSubPlanToString(plan, &task->msg));
if (plan->execEpSet.numOfEps <= 0) { if (plan->execEpSet.numOfEps <= 0) {
SCH_ERR_RET(schAvailableEpSet(job, &plan->execEpSet)); SCH_ERR_RET(schSetTaskExecEpSet(job, &plan->execEpSet));
}
if (plan->execEpSet.numOfEps <= 0) {
SCH_TASK_ERR_LOG("invalid execEpSet num:%d", plan->execEpSet.numOfEps);
SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR);
} }
SCH_ERR_RET(schAsyncSendMsg(job, task, TSDB_MSG_TYPE_QUERY)); SCH_ERR_RET(schAsyncSendMsg(job, task, TSDB_MSG_TYPE_QUERY));
@ -554,20 +566,23 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
} }
int32_t scheduleExecJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob) { int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob) {
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { if (NULL == transport || NULL == transport ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
} }
if (taosArrayGetSize(qnodeList) <= 0) {
qInfo("qnodeList is empty");
}
int32_t code = 0; int32_t code = 0;
SQueryJob *job = calloc(1, sizeof(SQueryJob)); SQueryJob *job = calloc(1, sizeof(SQueryJob));
if (NULL == job) { if (NULL == job) {
SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY);
} }
job->catalog = pCatalog; job->transport = transport;
job->rpc = pRpc; job->qnodeList = qnodeList;
job->mgmtEpSet = (SEpSet *)pMgmtEps;
SCH_ERR_JRET(schValidateAndBuildJob(pDag, job)); SCH_ERR_JRET(schValidateAndBuildJob(pDag, job));

View File

@ -79,17 +79,17 @@ TEST(testCase, normalCase) {
char *clusterId = "cluster1"; char *clusterId = "cluster1";
char *dbname = "1.db1"; char *dbname = "1.db1";
char *tablename = "table1"; char *tablename = "table1";
struct SCatalog* pCtg = (struct SCatalog*)mockPointer;
SVgroupInfo vgInfo = {0}; SVgroupInfo vgInfo = {0};
void *pJob = NULL; void *pJob = NULL;
SQueryDag dag = {0}; SQueryDag dag = {0};
SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr));
int32_t code = schedulerInit(NULL); int32_t code = schedulerInit(NULL);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
mockBuildDag(&dag); mockBuildDag(&dag);
code = scheduleExecJob(pCtg, mockPointer, (const SEpSet*)mockPointer, &dag, &pJob); code = scheduleExecJob(mockPointer, qnodeList, &dag, &pJob);
ASSERT_EQ(code, 0); ASSERT_EQ(code, 0);
} }