From 4e279a137dfdc4bd098e6758ff24c8c0e83a8be4 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 20 Dec 2021 13:24:30 +0800 Subject: [PATCH] add scheduler ut case --- include/libs/scheduler/scheduler.h | 6 +- source/libs/scheduler/CMakeLists.txt | 2 + source/libs/scheduler/src/scheduler.c | 22 ++-- source/libs/scheduler/test/CMakeLists.txt | 18 +++ source/libs/scheduler/test/schedulerTests.cpp | 104 ++++++++++++++++++ 5 files changed, 143 insertions(+), 9 deletions(-) create mode 100644 source/libs/scheduler/test/CMakeLists.txt diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index c01b79d7ff..35daea3ebd 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -50,13 +50,15 @@ typedef struct SQueryProfileSummary { uint64_t resultSize; // generated result size in Kb. } SQueryProfileSummary; +int32_t schedulerInit(SSchedulerCfg *cfg); + /** * Process the query job, generated according to the query physical plan. * This is a synchronized API, and is also thread-safety. * @param pJob * @return */ -int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob); +int32_t scheduleExecJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob); int32_t scheduleFetchRows(void *pJob, void **data); @@ -70,6 +72,8 @@ int32_t scheduleCancelJob(void *pJob); void scheduleFreeJob(void *pJob); +void schedulerDestroy(void); + #ifdef __cplusplus } #endif diff --git a/source/libs/scheduler/CMakeLists.txt b/source/libs/scheduler/CMakeLists.txt index cdb4e08205..6baaab1ef4 100644 --- a/source/libs/scheduler/CMakeLists.txt +++ b/source/libs/scheduler/CMakeLists.txt @@ -11,3 +11,5 @@ target_link_libraries( scheduler PRIVATE os util planner qcom common catalog transport ) + +ADD_SUBDIRECTORY(test) \ No newline at end of file diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 2327fc5b04..a20b4a7ee1 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -58,8 +58,8 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { for (int32_t m = 0; m < level->taskNum; ++m) { SQueryTask *task = taosArrayGet(level->subTasks, m); SSubplan *plan = task->plan; - int32_t childNum = (int32_t)taosArrayGetSize(plan->pChildern); - int32_t parentNum = (int32_t)taosArrayGetSize(plan->pParents); + int32_t childNum = plan->pChildern ? (int32_t)taosArrayGetSize(plan->pChildern) : 0; + int32_t parentNum = plan->pParents ? (int32_t)taosArrayGetSize(plan->pParents) : 0; if (childNum > 0) { task->children = taosArrayInit(childNum, POINTER_BYTES); @@ -187,13 +187,19 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { for (int32_t n = 0; n < levelPlanNum; ++n) { SSubplan *plan = taosArrayGet(levelPlans, n); - SQueryTask *task = taosArrayGet(level.subTasks, n); + SQueryTask task = {0}; - task->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); - task->plan = plan; - task->status = SCH_STATUS_NOT_START; + task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); + task.plan = plan; + task.status = SCH_STATUS_NOT_START; - if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &task, POINTER_BYTES)) { + void *p = taosArrayPush(level.subTasks, &task); + if (NULL == p) { + qError("taosArrayPush failed"); + SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); + } + + if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) { qError("taosHashPut failed"); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -548,7 +554,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { } -int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob) { +int32_t scheduleExecJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob) { if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } diff --git a/source/libs/scheduler/test/CMakeLists.txt b/source/libs/scheduler/test/CMakeLists.txt new file mode 100644 index 0000000000..00a6d08e5d --- /dev/null +++ b/source/libs/scheduler/test/CMakeLists.txt @@ -0,0 +1,18 @@ + +MESSAGE(STATUS "build scheduler unit test") + +# GoogleTest requires at least C++11 +SET(CMAKE_CXX_STANDARD 11) +AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) + +ADD_EXECUTABLE(schedulerTest ${SOURCE_LIST}) +TARGET_LINK_LIBRARIES( + schedulerTest + PUBLIC os util common catalog transport gtest qcom taos planner scheduler +) + +TARGET_INCLUDE_DIRECTORIES( + schedulerTest + PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/scheduler/" + PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/scheduler/inc" +) diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index e69de29bb2..8869271dc2 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -0,0 +1,104 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#pragma GCC diagnostic ignored "-Wwrite-strings" + +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" +#include "os.h" + +#include "taos.h" +#include "tdef.h" +#include "tvariant.h" +#include "catalog.h" +#include "scheduler.h" +#include "tep.h" +#include "trpc.h" + +namespace { +void mockBuildDag(SQueryDag *dag) { + uint64_t qId = 0x111111111111; + + dag->queryId = qId; + dag->numOfSubplans = 2; + dag->pSubplans = taosArrayInit(dag->numOfSubplans, POINTER_BYTES); + SArray *scan = taosArrayInit(1, sizeof(SSubplan)); + SArray *merge = taosArrayInit(1, sizeof(SSubplan)); + + SSubplan scanPlan = {0}; + SSubplan mergePlan = {0}; + + scanPlan.id.queryId = qId; + scanPlan.id.templateId = 0x2222222222; + scanPlan.id.subplanId = 0x3333333333; + scanPlan.type = QUERY_TYPE_SCAN; + scanPlan.level = 1; + scanPlan.execEpSet.numOfEps = 1; + scanPlan.pChildern = NULL; + scanPlan.pParents = taosArrayInit(1, POINTER_BYTES); + + mergePlan.id.queryId = qId; + mergePlan.id.templateId = 0x4444444444; + mergePlan.id.subplanId = 0x5555555555; + mergePlan.type = QUERY_TYPE_MERGE; + mergePlan.level = 0; + mergePlan.execEpSet.numOfEps = 1; + mergePlan.pChildern = taosArrayInit(1, POINTER_BYTES); + mergePlan.pParents = NULL; + + SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan); + SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan); + + taosArrayPush(mergePointer->pChildern, &scanPointer); + taosArrayPush(scanPointer->pParents, &mergePointer); + + taosArrayPush(dag->pSubplans, &merge); + taosArrayPush(dag->pSubplans, &scan); +} + +} + +TEST(testCase, normalCase) { + void *mockPointer = (void *)0x1; + char *clusterId = "cluster1"; + char *dbname = "1.db1"; + char *tablename = "table1"; + struct SCatalog* pCtg = (struct SCatalog*)mockPointer; + SVgroupInfo vgInfo = {0}; + void *pJob = NULL; + SQueryDag dag = {0}; + + int32_t code = schedulerInit(NULL); + ASSERT_EQ(code, 0); + + mockBuildDag(&dag); + + code = scheduleExecJob(pCtg, mockPointer, (const SEpSet*)mockPointer, &dag, &pJob); + ASSERT_EQ(code, 0); +} + + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + + + +