add scheduler ut case
This commit is contained in:
parent
97da2ee958
commit
4e279a137d
|
@ -50,13 +50,15 @@ typedef struct SQueryProfileSummary {
|
||||||
uint64_t resultSize; // generated result size in Kb.
|
uint64_t resultSize; // generated result size in Kb.
|
||||||
} SQueryProfileSummary;
|
} SQueryProfileSummary;
|
||||||
|
|
||||||
|
int32_t schedulerInit(SSchedulerCfg *cfg);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Process the query job, generated according to the query physical plan.
|
* Process the query job, generated according to the query physical plan.
|
||||||
* This is a synchronized API, and is also thread-safety.
|
* This is a synchronized API, and is also thread-safety.
|
||||||
* @param pJob
|
* @param pJob
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob);
|
int32_t scheduleExecJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob);
|
||||||
|
|
||||||
int32_t scheduleFetchRows(void *pJob, void **data);
|
int32_t scheduleFetchRows(void *pJob, void **data);
|
||||||
|
|
||||||
|
@ -70,6 +72,8 @@ int32_t scheduleCancelJob(void *pJob);
|
||||||
|
|
||||||
void scheduleFreeJob(void *pJob);
|
void scheduleFreeJob(void *pJob);
|
||||||
|
|
||||||
|
void schedulerDestroy(void);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -11,3 +11,5 @@ target_link_libraries(
|
||||||
scheduler
|
scheduler
|
||||||
PRIVATE os util planner qcom common catalog transport
|
PRIVATE os util planner qcom common catalog transport
|
||||||
)
|
)
|
||||||
|
|
||||||
|
ADD_SUBDIRECTORY(test)
|
|
@ -58,8 +58,8 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) {
|
||||||
for (int32_t m = 0; m < level->taskNum; ++m) {
|
for (int32_t m = 0; m < level->taskNum; ++m) {
|
||||||
SQueryTask *task = taosArrayGet(level->subTasks, m);
|
SQueryTask *task = taosArrayGet(level->subTasks, m);
|
||||||
SSubplan *plan = task->plan;
|
SSubplan *plan = task->plan;
|
||||||
int32_t childNum = (int32_t)taosArrayGetSize(plan->pChildern);
|
int32_t childNum = plan->pChildern ? (int32_t)taosArrayGetSize(plan->pChildern) : 0;
|
||||||
int32_t parentNum = (int32_t)taosArrayGetSize(plan->pParents);
|
int32_t parentNum = plan->pParents ? (int32_t)taosArrayGetSize(plan->pParents) : 0;
|
||||||
|
|
||||||
if (childNum > 0) {
|
if (childNum > 0) {
|
||||||
task->children = taosArrayInit(childNum, POINTER_BYTES);
|
task->children = taosArrayInit(childNum, POINTER_BYTES);
|
||||||
|
@ -187,13 +187,19 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) {
|
||||||
|
|
||||||
for (int32_t n = 0; n < levelPlanNum; ++n) {
|
for (int32_t n = 0; n < levelPlanNum; ++n) {
|
||||||
SSubplan *plan = taosArrayGet(levelPlans, n);
|
SSubplan *plan = taosArrayGet(levelPlans, n);
|
||||||
SQueryTask *task = taosArrayGet(level.subTasks, n);
|
SQueryTask task = {0};
|
||||||
|
|
||||||
task->taskId = atomic_add_fetch_64(&schMgmt.taskId, 1);
|
task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1);
|
||||||
task->plan = plan;
|
task.plan = plan;
|
||||||
task->status = SCH_STATUS_NOT_START;
|
task.status = SCH_STATUS_NOT_START;
|
||||||
|
|
||||||
if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &task, POINTER_BYTES)) {
|
void *p = taosArrayPush(level.subTasks, &task);
|
||||||
|
if (NULL == p) {
|
||||||
|
qError("taosArrayPush failed");
|
||||||
|
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (0 != taosHashPut(planToTask, &plan, POINTER_BYTES, &p, POINTER_BYTES)) {
|
||||||
qError("taosHashPut failed");
|
qError("taosHashPut failed");
|
||||||
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY);
|
||||||
}
|
}
|
||||||
|
@ -548,7 +554,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t scheduleQueryJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob) {
|
int32_t scheduleExecJob(struct SCatalog *pCatalog, void *pRpc, const SEpSet* pMgmtEps, SQueryDag* pDag, void** pJob) {
|
||||||
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
|
if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) {
|
||||||
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,18 @@
|
||||||
|
|
||||||
|
MESSAGE(STATUS "build scheduler unit test")
|
||||||
|
|
||||||
|
# GoogleTest requires at least C++11
|
||||||
|
SET(CMAKE_CXX_STANDARD 11)
|
||||||
|
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
|
||||||
|
|
||||||
|
ADD_EXECUTABLE(schedulerTest ${SOURCE_LIST})
|
||||||
|
TARGET_LINK_LIBRARIES(
|
||||||
|
schedulerTest
|
||||||
|
PUBLIC os util common catalog transport gtest qcom taos planner scheduler
|
||||||
|
)
|
||||||
|
|
||||||
|
TARGET_INCLUDE_DIRECTORIES(
|
||||||
|
schedulerTest
|
||||||
|
PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/scheduler/"
|
||||||
|
PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/scheduler/inc"
|
||||||
|
)
|
|
@ -0,0 +1,104 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
#include <tglobal.h>
|
||||||
|
#include <iostream>
|
||||||
|
#pragma GCC diagnostic ignored "-Wwrite-strings"
|
||||||
|
|
||||||
|
#pragma GCC diagnostic ignored "-Wunused-function"
|
||||||
|
#pragma GCC diagnostic ignored "-Wunused-variable"
|
||||||
|
#pragma GCC diagnostic ignored "-Wsign-compare"
|
||||||
|
#include "os.h"
|
||||||
|
|
||||||
|
#include "taos.h"
|
||||||
|
#include "tdef.h"
|
||||||
|
#include "tvariant.h"
|
||||||
|
#include "catalog.h"
|
||||||
|
#include "scheduler.h"
|
||||||
|
#include "tep.h"
|
||||||
|
#include "trpc.h"
|
||||||
|
|
||||||
|
namespace {
|
||||||
|
void mockBuildDag(SQueryDag *dag) {
|
||||||
|
uint64_t qId = 0x111111111111;
|
||||||
|
|
||||||
|
dag->queryId = qId;
|
||||||
|
dag->numOfSubplans = 2;
|
||||||
|
dag->pSubplans = taosArrayInit(dag->numOfSubplans, POINTER_BYTES);
|
||||||
|
SArray *scan = taosArrayInit(1, sizeof(SSubplan));
|
||||||
|
SArray *merge = taosArrayInit(1, sizeof(SSubplan));
|
||||||
|
|
||||||
|
SSubplan scanPlan = {0};
|
||||||
|
SSubplan mergePlan = {0};
|
||||||
|
|
||||||
|
scanPlan.id.queryId = qId;
|
||||||
|
scanPlan.id.templateId = 0x2222222222;
|
||||||
|
scanPlan.id.subplanId = 0x3333333333;
|
||||||
|
scanPlan.type = QUERY_TYPE_SCAN;
|
||||||
|
scanPlan.level = 1;
|
||||||
|
scanPlan.execEpSet.numOfEps = 1;
|
||||||
|
scanPlan.pChildern = NULL;
|
||||||
|
scanPlan.pParents = taosArrayInit(1, POINTER_BYTES);
|
||||||
|
|
||||||
|
mergePlan.id.queryId = qId;
|
||||||
|
mergePlan.id.templateId = 0x4444444444;
|
||||||
|
mergePlan.id.subplanId = 0x5555555555;
|
||||||
|
mergePlan.type = QUERY_TYPE_MERGE;
|
||||||
|
mergePlan.level = 0;
|
||||||
|
mergePlan.execEpSet.numOfEps = 1;
|
||||||
|
mergePlan.pChildern = taosArrayInit(1, POINTER_BYTES);
|
||||||
|
mergePlan.pParents = NULL;
|
||||||
|
|
||||||
|
SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan);
|
||||||
|
SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan);
|
||||||
|
|
||||||
|
taosArrayPush(mergePointer->pChildern, &scanPointer);
|
||||||
|
taosArrayPush(scanPointer->pParents, &mergePointer);
|
||||||
|
|
||||||
|
taosArrayPush(dag->pSubplans, &merge);
|
||||||
|
taosArrayPush(dag->pSubplans, &scan);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST(testCase, normalCase) {
|
||||||
|
void *mockPointer = (void *)0x1;
|
||||||
|
char *clusterId = "cluster1";
|
||||||
|
char *dbname = "1.db1";
|
||||||
|
char *tablename = "table1";
|
||||||
|
struct SCatalog* pCtg = (struct SCatalog*)mockPointer;
|
||||||
|
SVgroupInfo vgInfo = {0};
|
||||||
|
void *pJob = NULL;
|
||||||
|
SQueryDag dag = {0};
|
||||||
|
|
||||||
|
int32_t code = schedulerInit(NULL);
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
|
||||||
|
mockBuildDag(&dag);
|
||||||
|
|
||||||
|
code = scheduleExecJob(pCtg, mockPointer, (const SEpSet*)mockPointer, &dag, &pJob);
|
||||||
|
ASSERT_EQ(code, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
int main(int argc, char** argv) {
|
||||||
|
testing::InitGoogleTest(&argc, argv);
|
||||||
|
return RUN_ALL_TESTS();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue