From 4e279a137dfdc4bd098e6758ff24c8c0e83a8be4 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 20 Dec 2021 13:24:30 +0800 Subject: [PATCH 1/2] add scheduler ut case --- include/libs/scheduler/scheduler.h | 6 +- source/libs/scheduler/CMakeLists.txt | 2 + source/libs/scheduler/src/scheduler.c | 22 ++-- source/libs/scheduler/test/CMakeLists.txt | 18 +++ source/libs/scheduler/test/schedulerTests.cpp | 104 ++++++++++++++++++ 5 files changed, 143 insertions(+), 9 deletions(-) create mode 100644 source/libs/scheduler/test/CMakeLists.txt diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index c01b79d7ff..35daea3ebd 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -50,13 +50,15 @@ typedef struct SQueryProfileSummary { uint64_t resultSize; // generated result size in Kb. } SQueryProfileSummary; +int32_t schedulerInit(SSchedulerCfg *cfg); + /** * Process the query job, generated according to the query physical plan. * This is a synchronized API, and is also thread-safety. * @param pJob * @return */ -int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob); +int32_t scheduleExecJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob); int32_t scheduleFetchRows(void *pJob, void **data); @@ -70,6 +72,8 @@ int32_t scheduleCancelJob(void *pJob); void scheduleFreeJob(void *pJob); +void schedulerDestroy(void); + #ifdef __cplusplus } #endif diff --git a/source/libs/scheduler/CMakeLists.txt b/source/libs/scheduler/CMakeLists.txt index cdb4e08205..6baaab1ef4 100644 --- a/source/libs/scheduler/CMakeLists.txt +++ b/source/libs/scheduler/CMakeLists.txt @@ -11,3 +11,5 @@ target_link_libraries( scheduler PRIVATE os util planner qcom common catalog transport ) + +ADD_SUBDIRECTORY(test) \ No newline at end of file diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 2327fc5b04..a20b4a7ee1 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -58,8 +58,8 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { for (int32_t m = 0; m < level->taskNum; ++m) { SQueryTask *task = taosArrayGet(level->subTasks, m); SSubplan *plan = task->plan; - int32_t childNum = (int32_t)taosArrayGetSize(plan->pChildern); - int32_t parentNum = (int32_t)taosArrayGetSize(plan->pParents); + int32_t childNum = plan->pChildern ? (int32_t)taosArrayGetSize(plan->pChildern) : 0; + int32_t parentNum = plan->pParents ? (int32_t)taosArrayGetSize(plan->pParents) : 0; if (childNum > 0) { task->children = taosArrayInit(childNum, POINTER_BYTES); @@ -187,13 +187,19 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { for (int32_t n = 0; n < levelPlanNum; ++n) { SSubplan *plan = taosArrayGet(levelPlans, n); - SQueryTask *task = taosArrayGet(level.subTasks, n); + SQueryTask task = {0}; - task->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); - task->plan = plan; - task->status = SCH_STATUS_NOT_START; + task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); + task.plan = plan; + task.status = SCH_STATUS_NOT_START; - if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &task, POINTER_BYTES)) { + void *p = taosArrayPush(level.subTasks, &task); + if (NULL == p) { + qError("taosArrayPush failed"); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) { qError("taosHashPut failed"); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -548,7 +554,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { } -int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob) { +int32_t scheduleExecJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob) { if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } diff --git a/source/libs/scheduler/test/CMakeLists.txt b/source/libs/scheduler/test/CMakeLists.txt new file mode 100644 index 0000000000..00a6d08e5d --- /dev/null +++ b/source/libs/scheduler/test/CMakeLists.txt @@ -0,0 +1,18 @@ + +MESSAGE(STATUS "build scheduler unit test") + +# GoogleTest requires at least C++11 +SET(CMAKE_CXX_STANDARD 11) +AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) + +ADD_EXECUTABLE(schedulerTest ${SOURCE_LIST}) +TARGET_LINK_LIBRARIES( + schedulerTest + PUBLIC os util common catalog transport gtest qcom taos planner scheduler +) + +TARGET_INCLUDE_DIRECTORIES( + schedulerTest + PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/scheduler/" + PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/scheduler/inc" +) diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index e69de29bb2..8869271dc2 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#pragma GCC diagnostic ignored "-Wwrite-strings" + +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" +#include "os.h" + +#include "taos.h" +#include "tdef.h" +#include "tvariant.h" +#include "catalog.h" +#include "scheduler.h" +#include "tep.h" +#include "trpc.h" + +namespace { +void mockBuildDag(SQueryDag *dag) { + uint64_t qId = 0x111111111111; + + dag->queryId = qId; + dag->numOfSubplans = 2; + dag->pSubplans = taosArrayInit(dag->numOfSubplans, POINTER_BYTES); + SArray *scan = taosArrayInit(1, sizeof(SSubplan)); + SArray *merge = taosArrayInit(1, sizeof(SSubplan)); + + SSubplan scanPlan = {0}; + SSubplan mergePlan = {0}; + + scanPlan.id.queryId = qId; + scanPlan.id.templateId = 0x2222222222; + scanPlan.id.subplanId = 0x3333333333; + scanPlan.type = QUERY_TYPE_SCAN; + scanPlan.level = 1; + scanPlan.execEpSet.numOfEps = 1; + scanPlan.pChildern = NULL; + scanPlan.pParents = taosArrayInit(1, POINTER_BYTES); + + mergePlan.id.queryId = qId; + mergePlan.id.templateId = 0x4444444444; + mergePlan.id.subplanId = 0x5555555555; + mergePlan.type = QUERY_TYPE_MERGE; + mergePlan.level = 0; + mergePlan.execEpSet.numOfEps = 1; + mergePlan.pChildern = taosArrayInit(1, POINTER_BYTES); + mergePlan.pParents = NULL; + + SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan); + SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan); + + taosArrayPush(mergePointer->pChildern, &scanPointer); + taosArrayPush(scanPointer->pParents, &mergePointer); + + taosArrayPush(dag->pSubplans, &merge); + taosArrayPush(dag->pSubplans, &scan); +} + +} + +TEST(testCase, normalCase) { + void *mockPointer = (void *)0x1; + char *clusterId = "cluster1"; + char *dbname = "1.db1"; + char *tablename = "table1"; + struct SCatalog* pCtg = (struct SCatalog*)mockPointer; + SVgroupInfo vgInfo = {0}; + void *pJob = NULL; + SQueryDag dag = {0}; + + int32_t code = schedulerInit(NULL); + ASSERT_EQ(code, 0); + + mockBuildDag(&dag); + + code = scheduleExecJob(pCtg, mockPointer, (const SEpSet*)mockPointer, &dag, &pJob); + ASSERT_EQ(code, 0); +} + + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + + + + From 5ed3bb079ceeca30c65aac79feed881a03e9f656 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 20 Dec 2021 14:31:04 +0800 Subject: [PATCH 2/2] feature/scheduler --- include/libs/scheduler/scheduler.h | 4 +- source/libs/scheduler/inc/schedulerInt.h | 5 +- source/libs/scheduler/src/scheduler.c | 47 ++++++++++++------- source/libs/scheduler/test/schedulerTests.cpp | 4 +- 4 files changed, 37 insertions(+), 23 deletions(-) diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 35daea3ebd..2cbf26f877 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -55,10 +55,10 @@ int32_t schedulerInit(SSchedulerCfg *cfg); /** * Process the query job, generated according to the query physical plan. * This is a synchronized API, and is also thread-safety. - * @param pJob + * @param qnodeList Qnode address list, element is SEpAddr * @return */ -int32_t scheduleExecJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob); +int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob); int32_t scheduleFetchRows(void *pJob, void **data); diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index db75fc4fdd..73e7c4d24e 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -73,9 +73,8 @@ typedef struct SQueryJob { SQueryProfileSummary summary; SEpSet dataSrcEps; SEpAddr resEp; - struct SCatalog *catalog; - void *rpc; - SEpSet *mgmtEpSet; + void *transport; + SArray *qnodeList; tsem_t rspSem; int32_t userFetch; int32_t remoteFetch; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index a20b4a7ee1..6014ff9ab6 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -231,20 +231,27 @@ _return: SCH_RET(code); } -int32_t schAvailableEpSet(SQueryJob *job, SEpSet *epSet) { +int32_t schSetTaskExecEpSet(SQueryJob *job, SEpSet *epSet) { if (epSet->numOfEps >= SCH_MAX_CONDIDATE_EP_NUM) { return TSDB_CODE_SUCCESS; } - if (SCH_HAS_QNODE_IN_CLUSTER(schMgmt.cfg.clusterType)) { - SCH_ERR_RET(catalogGetQnodeList(job->catalog, job->rpc, job->mgmtEpSet, epSet)); - } else { - for (int32_t i = 0; i < job->dataSrcEps.numOfEps; ++i) { - strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i])); - epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i]; - - ++epSet->numOfEps; - } + int32_t qnodeNum = taosArrayGetSize(job->qnodeList); + + for (int32_t i = 0; i < qnodeNum && epSet->numOfEps < tListLen(epSet->port); ++i) { + SEpAddr *addr = taosArrayGet(job->qnodeList, i); + + strncpy(epSet->fqdn[epSet->numOfEps], addr->fqdn, sizeof(addr->fqdn)); + epSet->port[epSet->numOfEps] = addr->port; + + ++epSet->numOfEps; + } + + for (int32_t i = 0; i < job->dataSrcEps.numOfEps && epSet->numOfEps < tListLen(epSet->port); ++i) { + strncpy(epSet->fqdn[epSet->numOfEps], job->dataSrcEps.fqdn[i], sizeof(job->dataSrcEps.fqdn[i])); + epSet->port[epSet->numOfEps] = job->dataSrcEps.port[i]; + + ++epSet->numOfEps; } return TSDB_CODE_SUCCESS; @@ -515,7 +522,12 @@ int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) { SCH_ERR_RET(qSubPlanToString(plan, &task->msg)); if (plan->execEpSet.numOfEps <= 0) { - SCH_ERR_RET(schAvailableEpSet(job, &plan->execEpSet)); + SCH_ERR_RET(schSetTaskExecEpSet(job, &plan->execEpSet)); + } + + if (plan->execEpSet.numOfEps <= 0) { + SCH_TASK_ERR_LOG("invalid execEpSet num:%d", plan->execEpSet.numOfEps); + SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } SCH_ERR_RET(schAsyncSendMsg(job, task, TSDB_MSG_TYPE_QUERY)); @@ -554,20 +566,23 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { } -int32_t scheduleExecJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob) { - if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { +int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob) { + if (NULL == transport || NULL == transport ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } + if (taosArrayGetSize(qnodeList) <= 0) { + qInfo("qnodeList is empty"); + } + int32_t code = 0; SQueryJob *job = calloc(1, sizeof(SQueryJob)); if (NULL == job) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - job->catalog = pCatalog; - job->rpc = pRpc; - job->mgmtEpSet = (SEpSet *)pMgmtEps; + job->transport = transport; + job->qnodeList = qnodeList; SCH_ERR_JRET(schValidateAndBuildJob(pDag, job)); diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 8869271dc2..9e94553058 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -79,17 +79,17 @@ TEST(testCase, normalCase) { char *clusterId = "cluster1"; char *dbname = "1.db1"; char *tablename = "table1"; - struct SCatalog* pCtg = (struct SCatalog*)mockPointer; SVgroupInfo vgInfo = {0}; void *pJob = NULL; SQueryDag dag = {0}; + SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr)); int32_t code = schedulerInit(NULL); ASSERT_EQ(code, 0); mockBuildDag(&dag); - code = scheduleExecJob(pCtg, mockPointer, (const SEpSet*)mockPointer, &dag, &pJob); + code = scheduleExecJob(mockPointer, qnodeList, &dag, &pJob); ASSERT_EQ(code, 0); }