From 250e5152a02c29ba390d07ce99955716b23be1dd Mon Sep 17 00:00:00 2001 From: dapan Date: Sun, 26 Dec 2021 20:05:24 +0800 Subject: [PATCH] feature/qnode --- source/libs/qworker/src/qworker.c | 5 + source/libs/qworker/test/CMakeLists.txt | 2 +- source/libs/qworker/test/qworkerTests.cpp | 75 ++++++++-- source/libs/scheduler/inc/schedulerInt.h | 27 ++-- source/libs/scheduler/src/scheduler.c | 99 +++++++------- source/libs/scheduler/test/schedulerTests.cpp | 129 ++++++++++++++++-- 6 files changed, 254 insertions(+), 83 deletions(-) diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 37d3e655c2..25f316001f 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -943,6 +943,11 @@ int32_t qWorkerProcessQueryMsg(void *node, void *qWorkerMgmt, SRpcMsg *pMsg) { qError("invalid query msg"); QW_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } + + msg->schedulerId = htobe64(msg->schedulerId); + msg->queryId = htobe64(msg->queryId); + msg->taskId = htobe64(msg->taskId); + msg->contentLen = ntohl(msg->contentLen); bool queryDone = false; bool queryRsp = false; diff --git a/source/libs/qworker/test/CMakeLists.txt b/source/libs/qworker/test/CMakeLists.txt index 6fb5e4d5c0..6d755ad487 100644 --- a/source/libs/qworker/test/CMakeLists.txt +++ b/source/libs/qworker/test/CMakeLists.txt @@ -8,7 +8,7 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) ADD_EXECUTABLE(qworkerTest ${SOURCE_LIST}) TARGET_LINK_LIBRARIES( qworkerTest - PUBLIC os util common transport gtest qcom + PUBLIC os util common transport gtest qcom planner qworker ) TARGET_INCLUDE_DIRECTORIES( diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index fb1ba5c43f..4b54b77544 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -28,40 +28,89 @@ #include "tvariant.h" #include "tep.h" #include "trpc.h" +#include "planner.h" +#include "qworker.h" #include "stub.h" #include "addr_any.h" namespace { - -} - -void __rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { - SUseDbRsp *rspMsg = NULL; //todo - - return; +int32_t qwtStringToPlan(const char* str, SSubplan** subplan) { + return 0; } -void initTestEnv() { +void stubSetStringToPlan() { static Stub stub; - stub.set(rpcSendRecv, __rpcSendRecv); + stub.set(qStringToSubplan, qwtStringToPlan); { - AddrAny any("libtransport.so"); + AddrAny any("libplanner.so"); std::map result; - any.get_global_func_addr_dynsym("^rpcSendRecv$", result); + any.get_global_func_addr_dynsym("^qStringToSubplan$", result); for (const auto& f : result) { - stub.set(f.second, __rpcSendRecv); + stub.set(f.second, qwtStringToPlan); } } } -TEST(testCase, normalCase) { } +TEST(testCase, normalCase) { + void *mgmt = NULL; + int32_t code = 0; + void *mockPointer = (void *)0x1; + SRpcMsg queryRpc = {0}; + SRpcMsg readyRpc = {0}; + SRpcMsg fetchRpc = {0}; + SRpcMsg dropRpc = {0}; + SSubQueryMsg *queryMsg = (SSubQueryMsg *)calloc(1, sizeof(SSubQueryMsg) + 100); + queryMsg->queryId = htobe64(1); + queryMsg->schedulerId = htobe64(1); + queryMsg->taskId = htobe64(1); + queryMsg->contentLen = htonl(100); + queryRpc.pCont = queryMsg; + + SResReadyMsg readyMsg = {0}; + readyMsg.schedulerId = htobe64(1); + readyMsg.queryId = htobe64(1); + readyMsg.taskId = htobe64(1); + readyRpc.pCont = &readyMsg; + + SResFetchMsg fetchMsg = {0}; + fetchMsg.schedulerId = htobe64(1); + fetchMsg.queryId = htobe64(1); + fetchMsg.taskId = htobe64(1); + fetchRpc.pCont = &fetchMsg; + + STaskDropMsg dropMsg = {0}; + dropMsg.schedulerId = htobe64(1); + dropMsg.queryId = htobe64(1); + dropMsg.taskId = htobe64(1); + dropRpc.pCont = &dropMsg; + + stubSetStringToPlan(); + + code = qWorkerInit(NULL, &mgmt); + ASSERT_EQ(code, 0); + + code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); + ASSERT_EQ(code, 0); + + code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc); + ASSERT_EQ(code, 0); + + code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc); + ASSERT_EQ(code, 0); + + code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc); + ASSERT_EQ(code, 0); + +} + + int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS(); diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index b1c3f83e9c..2381a1dd49 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -43,7 +43,7 @@ typedef struct SSchedulerMgmt { SHashObj *jobs; // key: queryId, value: SQueryJob* } SSchedulerMgmt; -typedef struct SQueryLevel { +typedef struct SSchLevel { int32_t level; int8_t status; SRWLatch lock; @@ -51,12 +51,12 @@ typedef struct SQueryLevel { int32_t taskSucceed; int32_t taskNum; SArray *subTasks; // Element is SQueryTask -} SQueryLevel; +} SSchLevel; -typedef struct SQueryTask { +typedef struct SSchTask { uint64_t taskId; // task id - SQueryLevel *level; // level + SSchLevel *level; // level SSubplan *plan; // subplan char *msg; // operator tree int32_t msgLen; // msg length @@ -66,19 +66,20 @@ typedef struct SQueryTask { int32_t childReady; // child task ready number SArray *children; // the datasource tasks,from which to fetch the result, element is SQueryTask* SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* -} SQueryTask; +} SSchTask; -typedef struct SQueryJobAttr { +typedef struct SSchJobAttr { bool needFetch; - bool syncQuery; -} SQueryJobAttr; + bool syncSchedule; + bool queryJob; +} SSchJobAttr; -typedef struct SQueryJob { +typedef struct SSchJob { uint64_t queryId; int32_t levelNum; int32_t levelIdx; int8_t status; - SQueryJobAttr attr; + SSchJobAttr attr; SQueryProfileSummary summary; SEpSet dataSrcEps; SEpAddr resEp; @@ -88,7 +89,7 @@ typedef struct SQueryJob { int32_t userFetch; int32_t remoteFetch; - SQueryTask *fetchTask; + SSchTask *fetchTask; int32_t errCode; void *res; int32_t resNumOfRows; @@ -99,7 +100,7 @@ typedef struct SQueryJob { SArray *levels; // Element is SQueryLevel, starting from 0. SArray *subPlans; // Element is SArray*, and nested element is SSubplan. The execution level of subplan, starting from 0. -} SQueryJob; +} SSchJob; #define SCH_HAS_QNODE_IN_CLUSTER(type) (false) //TODO CLUSTER TYPE #define SCH_TASK_READY_TO_LUNCH(task) ((task)->childReady >= taosArrayGetSize((task)->children)) // MAY NEED TO ENHANCE @@ -118,7 +119,7 @@ typedef struct SQueryJob { #define SCH_UNLOCK(type, _lock) (SCH_READ == (type) ? taosRUnLockLatch(_lock) : taosWUnLockLatch(_lock)) -extern int32_t schLaunchTask(SQueryJob *job, SQueryTask *task); +extern int32_t schLaunchTask(SSchJob *job, SSchTask *task); #ifdef __cplusplus } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 17d3d800f0..668b6ec569 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -51,12 +51,12 @@ int32_t schBuildAndSendRequest(void *pRpc, const SEpSet* pMgmtEps, __taos_async_ */ } -int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { +int32_t schBuildTaskRalation(SSchJob *job, SHashObj *planToTask) { for (int32_t i = 0; i < job->levelNum; ++i) { - SQueryLevel *level = taosArrayGet(job->levels, i); + SSchLevel *level = taosArrayGet(job->levels, i); for (int32_t m = 0; m < level->taskNum; ++m) { - SQueryTask *task = taosArrayGet(level->subTasks, m); + SSchTask *task = taosArrayGet(level->subTasks, m); SSubplan *plan = task->plan; int32_t childNum = plan->pChildern ? (int32_t)taosArrayGetSize(plan->pChildern) : 0; int32_t parentNum = plan->pParents ? (int32_t)taosArrayGetSize(plan->pParents) : 0; @@ -71,7 +71,7 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { for (int32_t n = 0; n < childNum; ++n) { SSubplan **child = taosArrayGet(plan->pChildern, n); - SQueryTask **childTask = taosHashGet(planToTask, child, POINTER_BYTES); + SSchTask **childTask = taosHashGet(planToTask, child, POINTER_BYTES); if (NULL == childTask || NULL == *childTask) { qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); @@ -93,7 +93,7 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { for (int32_t n = 0; n < parentNum; ++n) { SSubplan **parent = taosArrayGet(plan->pParents, n); - SQueryTask **parentTask = taosHashGet(planToTask, parent, POINTER_BYTES); + SSchTask **parentTask = taosHashGet(planToTask, parent, POINTER_BYTES); if (NULL == parentTask || NULL == *parentTask) { qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); @@ -107,13 +107,13 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { } } - SQueryLevel *level = taosArrayGet(job->levels, 0); - if (level->taskNum > 1) { + SSchLevel *level = taosArrayGet(job->levels, 0); + if (job->attr.queryJob && level->taskNum > 1) { qError("invalid plan info, level 0, taskNum:%d", level->taskNum); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - SQueryTask *task = taosArrayGet(level->subTasks, 0); + SSchTask *task = taosArrayGet(level->subTasks, 0); if (task->parents && taosArrayGetSize(task->parents) > 0) { qError("invalid plan info, level 0, parentNum:%d", (int32_t)taosArrayGetSize(task->parents)); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); @@ -124,7 +124,7 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { } -int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { +int32_t schValidateAndBuildJob(SQueryDag *dag, SSchJob *job) { int32_t code = 0; job->queryId = dag->queryId; @@ -146,7 +146,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - job->levels = taosArrayInit(levelNum, sizeof(SQueryLevel)); + job->levels = taosArrayInit(levelNum, sizeof(SSchLevel)); if (NULL == job->levels) { qError("taosArrayInit %d failed", levelNum); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -159,10 +159,10 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { job->subPlans = dag->pSubplans; - SQueryLevel level = {0}; + SSchLevel level = {0}; SArray *levelPlans = NULL; int32_t levelPlanNum = 0; - SQueryLevel *pLevel = NULL; + SSchLevel *pLevel = NULL; level.status = JOB_TASK_STATUS_NOT_START; @@ -189,7 +189,7 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { pLevel->taskNum = levelPlanNum; - pLevel->subTasks = taosArrayInit(levelPlanNum, sizeof(SQueryTask)); + pLevel->subTasks = taosArrayInit(levelPlanNum, sizeof(SSchTask)); if (NULL == pLevel->subTasks) { qError("taosArrayInit %d failed", levelPlanNum); SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -197,11 +197,14 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { for (int32_t n = 0; n < levelPlanNum; ++n) { SSubplan *plan = taosArrayGet(levelPlans, n); - SQueryTask task = {0}; + SSchTask task = {0}; if (plan->type == QUERY_TYPE_MODIFY) { job->attr.needFetch = false; + } else { + job->attr.queryJob = true; } + task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); task.plan = plan; @@ -242,7 +245,7 @@ _return: SCH_RET(code); } -int32_t schSetTaskExecEpSet(SQueryJob *job, SEpSet *epSet) { +int32_t schSetTaskExecEpSet(SSchJob *job, SEpSet *epSet) { if (epSet->numOfEps >= SCH_MAX_CONDIDATE_EP_NUM) { return TSDB_CODE_SUCCESS; } @@ -269,7 +272,7 @@ int32_t schSetTaskExecEpSet(SQueryJob *job, SEpSet *epSet) { } -int32_t schPushTaskToExecList(SQueryJob *job, SQueryTask *task) { +int32_t schPushTaskToExecList(SSchJob *job, SSchTask *task) { if (0 != taosHashPut(job->execTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { qError("taosHashPut failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); @@ -278,7 +281,7 @@ int32_t schPushTaskToExecList(SQueryJob *job, SQueryTask *task) { return TSDB_CODE_SUCCESS; } -int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) { +int32_t schMoveTaskToSuccList(SSchJob *job, SSchTask *task, bool *moved) { if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId); return TSDB_CODE_SUCCESS; @@ -294,7 +297,7 @@ int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) { return TSDB_CODE_SUCCESS; } -int32_t schMoveTaskToFailList(SQueryJob *job, SQueryTask *task, bool *moved) { +int32_t schMoveTaskToFailList(SSchJob *job, SSchTask *task, bool *moved) { if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { qWarn("remove task[%"PRIx64"] from execTasks failed, it may not exist", task->taskId); } @@ -310,7 +313,7 @@ int32_t schMoveTaskToFailList(SQueryJob *job, SQueryTask *task, bool *moved) { } -int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { +int32_t schAsyncSendMsg(SSchJob *job, SSchTask *task, int32_t msgType) { int32_t msgSize = 0; void *msg = NULL; @@ -404,7 +407,7 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { return TSDB_CODE_SUCCESS; } -int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCode, bool *needRetry) { +int32_t schTaskCheckAndSetRetry(SSchJob *job, SSchTask *task, int32_t errCode, bool *needRetry) { // TODO set retry or not based on task type/errCode/retry times/job status/available eps... // TODO if needRetry, set task retry info @@ -414,7 +417,7 @@ int32_t schTaskCheckAndSetRetry(SQueryJob *job, SQueryTask *task, int32_t errCod } -int32_t schFetchFromRemote(SQueryJob *job) { +int32_t schFetchFromRemote(SSchJob *job) { int32_t code = 0; if (atomic_val_compare_exchange_32(&job->remoteFetch, 0, 1) != 0) { @@ -433,10 +436,10 @@ _return: } -int32_t schProcessOnJobPartialSuccess(SQueryJob *job) { +int32_t schProcessOnJobPartialSuccess(SSchJob *job) { job->status = JOB_TASK_STATUS_PARTIAL_SUCCEED; - if ((!job->attr.needFetch) && job->attr.syncQuery) { + if ((!job->attr.needFetch) && job->attr.syncSchedule) { tsem_post(&job->rspSem); } @@ -447,27 +450,27 @@ int32_t schProcessOnJobPartialSuccess(SQueryJob *job) { return TSDB_CODE_SUCCESS; } -int32_t schProcessOnJobFailure(SQueryJob *job, int32_t errCode) { +int32_t schProcessOnJobFailure(SSchJob *job, int32_t errCode) { job->status = JOB_TASK_STATUS_FAILED; job->errCode = errCode; atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); - if (job->userFetch) { + if (job->userFetch || ((!job->attr.needFetch) && job->attr.syncSchedule)) { tsem_post(&job->rspSem); } return TSDB_CODE_SUCCESS; } -int32_t schProcessOnDataFetched(SQueryJob *job) { +int32_t schProcessOnDataFetched(SSchJob *job) { atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); tsem_post(&job->rspSem); } -int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { +int32_t schProcessOnTaskSuccess(SSchJob *job, SSchTask *task) { bool moved = false; SCH_ERR_RET(schMoveTaskToSuccList(job, task, &moved)); @@ -489,7 +492,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { if (SCH_TASK_NEED_WAIT_ALL(task)) { SCH_LOCK(SCH_WRITE, &task->level->lock); - task->level->taskFailed++; + task->level->taskSucceed++; taskDone = task->level->taskSucceed + task->level->taskFailed; SCH_UNLOCK(SCH_WRITE, &task->level->lock); @@ -524,7 +527,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { } for (int32_t i = 0; i < parentNum; ++i) { - SQueryTask *par = *(SQueryTask **)taosArrayGet(task->parents, i); + SSchTask *par = *(SSchTask **)taosArrayGet(task->parents, i); ++par->childReady; @@ -538,7 +541,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { return TSDB_CODE_SUCCESS; } -int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCode) { +int32_t schProcessOnTaskFailure(SSchJob *job, SSchTask *task, int32_t errCode) { bool needRetry = false; bool moved = false; int32_t taskDone = 0; @@ -575,7 +578,7 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod return TSDB_CODE_SUCCESS; } -int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { +int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { int32_t code = 0; switch (msgType) { @@ -584,7 +587,7 @@ int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, char if (rsp->code != TSDB_CODE_SUCCESS) { SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code)); } else { - job->resNumOfRows += rsp->numOfRows; + job->resNumOfRows += rsp->affectedRows; code = schProcessOnTaskSuccess(job, task); if (code) { @@ -648,7 +651,7 @@ _return: -int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) { +int32_t schLaunchTask(SSchJob *job, SSchTask *task) { SSubplan *plan = task->plan; SCH_ERR_RET(qSubPlanToString(plan, &task->msg, &task->msgLen)); if (plan->execEpSet.numOfEps <= 0) { @@ -671,10 +674,10 @@ int32_t schLaunchTask(SQueryJob *job, SQueryTask *task) { return TSDB_CODE_SUCCESS; } -int32_t schLaunchJob(SQueryJob *job) { - SQueryLevel *level = taosArrayGet(job->levels, job->levelIdx); +int32_t schLaunchJob(SSchJob *job) { + SSchLevel *level = taosArrayGet(job->levels, job->levelIdx); for (int32_t i = 0; i < level->taskNum; ++i) { - SQueryTask *task = taosArrayGet(level->subTasks, i); + SSchTask *task = taosArrayGet(level->subTasks, i); SCH_ERR_RET(schLaunchTask(job, task)); } @@ -683,10 +686,10 @@ int32_t schLaunchJob(SQueryJob *job) { return TSDB_CODE_SUCCESS; } -void schDropJobAllTasks(SQueryJob *job) { +void schDropJobAllTasks(SSchJob *job) { void *pIter = taosHashIterate(job->succTasks, NULL); while (pIter) { - SQueryTask *task = *(SQueryTask **)pIter; + SSchTask *task = *(SSchTask **)pIter; schAsyncSendMsg(job, task, TSDB_MSG_TYPE_DROP_TASK); @@ -695,7 +698,7 @@ void schDropJobAllTasks(SQueryJob *job) { pIter = taosHashIterate(job->failTasks, NULL); while (pIter) { - SQueryTask *task = *(SQueryTask **)pIter; + SSchTask *task = *(SSchTask **)pIter; schAsyncSendMsg(job, task, TSDB_MSG_TYPE_DROP_TASK); @@ -721,7 +724,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { } -int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, bool syncQuery) { +int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, bool syncSchedule) { if (NULL == transport || NULL == transport ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -731,12 +734,12 @@ int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag, } int32_t code = 0; - SQueryJob *job = calloc(1, sizeof(SQueryJob)); + SSchJob *job = calloc(1, sizeof(SSchJob)); if (NULL == job) { SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } - job->attr.syncQuery = syncQuery; + job->attr.syncSchedule = syncSchedule; job->transport = transport; job->qnodeList = qnodeList; @@ -777,9 +780,9 @@ int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag, SCH_ERR_JRET(schLaunchJob(job)); - *(SQueryJob **)pJob = job; + *(SSchJob **)pJob = job; - if (syncQuery) { + if (syncSchedule) { tsem_wait(&job->rspSem); } @@ -787,7 +790,7 @@ int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag, _return: - *(SQueryJob **)pJob = NULL; + *(SSchJob **)pJob = NULL; scheduleFreeJob(job); SCH_RET(code); @@ -798,7 +801,7 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi SCH_ERR_RET(scheduleExecJobImpl(transport, qnodeList, pDag, pJob, true)); - SQueryJob *job = *(SQueryJob **)pJob; + SSchJob *job = *(SSchJob **)pJob; *numOfRows = job->resNumOfRows; @@ -815,7 +818,7 @@ int32_t scheduleFetchRows(void *pJob, void **data) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } - SQueryJob *job = pJob; + SSchJob *job = pJob; int32_t code = 0; if (!job->attr.needFetch) { @@ -874,7 +877,7 @@ void scheduleFreeJob(void *pJob) { return; } - SQueryJob *job = pJob; + SSchJob *job = pJob; if (job->status > 0) { if (0 != taosHashRemove(schMgmt.jobs, &job->queryId, sizeof(job->queryId))) { diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 6692b4b932..be6c8d225d 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -36,9 +36,9 @@ namespace { -extern "C" int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode); +extern "C" int32_t schHandleRspMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode); -void schtBuildDag(SQueryDag *dag) { +void schtBuildQueryDag(SQueryDag *dag) { uint64_t qId = 0x0000000000000001; dag->queryId = qId; @@ -82,6 +82,50 @@ void schtBuildDag(SQueryDag *dag) { taosArrayPush(dag->pSubplans, &scan); } +void schtBuildInsertDag(SQueryDag *dag) { + uint64_t qId = 0x0000000000000002; + + dag->queryId = qId; + dag->numOfSubplans = 2; + dag->pSubplans = taosArrayInit(1, POINTER_BYTES); + SArray *inserta = taosArrayInit(dag->numOfSubplans, sizeof(SSubplan)); + + SSubplan insertPlan[2] = {0}; + + insertPlan[0].id.queryId = qId; + insertPlan[0].id.templateId = 0x0000000000000003; + insertPlan[0].id.subplanId = 0x0000000000000004; + insertPlan[0].type = QUERY_TYPE_MODIFY; + insertPlan[0].level = 0; + insertPlan[0].execEpSet.numOfEps = 1; + insertPlan[0].execEpSet.port[0] = 6030; + strcpy(insertPlan[0].execEpSet.fqdn[0], "ep0"); + insertPlan[0].pChildern = NULL; + insertPlan[0].pParents = NULL; + insertPlan[0].pNode = NULL; + insertPlan[0].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink)); + + insertPlan[1].id.queryId = qId; + insertPlan[1].id.templateId = 0x0000000000000003; + insertPlan[1].id.subplanId = 0x0000000000000005; + insertPlan[1].type = QUERY_TYPE_MODIFY; + insertPlan[1].level = 0; + insertPlan[1].execEpSet.numOfEps = 1; + insertPlan[1].execEpSet.port[0] = 6030; + strcpy(insertPlan[1].execEpSet.fqdn[0], "ep1"); + insertPlan[1].pChildern = NULL; + insertPlan[1].pParents = NULL; + insertPlan[1].pNode = NULL; + insertPlan[1].pDataSink = (SDataSink*)calloc(1, sizeof(SDataSink)); + + + taosArrayPush(inserta, &insertPlan[0]); + taosArrayPush(inserta, &insertPlan[1]); + + taosArrayPush(dag->pSubplans, &inserta); +} + + int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) { *str = (char *)calloc(1, 20); *len = 20; @@ -119,6 +163,35 @@ void schtSetExecNode() { } } +void *schtSendRsp(void *param) { + SSchJob *job = NULL; + int32_t code = 0; + + while (true) { + job = *(SSchJob **)param; + if (job) { + break; + } + + usleep(1000); + } + + void *pIter = taosHashIterate(job->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + + SShellSubmitRspMsg rsp = {0}; + rsp.affectedRows = 10; + schHandleRspMsg(job, task, TSDB_MSG_TYPE_SUBMIT, (char *)&rsp, sizeof(rsp), 0); + + pIter = taosHashIterate(job->execTasks, pIter); + } + + return NULL; +} + +void *pInsertJob = NULL; + } @@ -140,7 +213,7 @@ TEST(queryTest, normalCase) { int32_t code = schedulerInit(NULL); ASSERT_EQ(code, 0); - schtBuildDag(&dag); + schtBuildQueryDag(&dag); schtSetPlanToString(); schtSetExecNode(); @@ -148,10 +221,10 @@ TEST(queryTest, normalCase) { code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &pJob); ASSERT_EQ(code, 0); - SQueryJob *job = (SQueryJob *)pJob; + SSchJob *job = (SSchJob *)pJob; void *pIter = taosHashIterate(job->execTasks, NULL); while (pIter) { - SQueryTask *task = *(SQueryTask **)pIter; + SSchTask *task = *(SSchTask **)pIter; SQueryTableRsp rsp = {0}; code = schHandleRspMsg(job, task, TSDB_MSG_TYPE_QUERY, (char *)&rsp, sizeof(rsp), 0); @@ -162,7 +235,7 @@ TEST(queryTest, normalCase) { pIter = taosHashIterate(job->execTasks, NULL); while (pIter) { - SQueryTask *task = *(SQueryTask **)pIter; + SSchTask *task = *(SSchTask **)pIter; SResReadyRsp rsp = {0}; code = schHandleRspMsg(job, task, TSDB_MSG_TYPE_RES_READY, (char *)&rsp, sizeof(rsp), 0); @@ -173,7 +246,7 @@ TEST(queryTest, normalCase) { pIter = taosHashIterate(job->execTasks, NULL); while (pIter) { - SQueryTask *task = *(SQueryTask **)pIter; + SSchTask *task = *(SSchTask **)pIter; SQueryTableRsp rsp = {0}; code = schHandleRspMsg(job, task, TSDB_MSG_TYPE_QUERY, (char *)&rsp, sizeof(rsp), 0); @@ -184,7 +257,7 @@ TEST(queryTest, normalCase) { pIter = taosHashIterate(job->execTasks, NULL); while (pIter) { - SQueryTask *task = *(SQueryTask **)pIter; + SSchTask *task = *(SSchTask **)pIter; SResReadyRsp rsp = {0}; code = schHandleRspMsg(job, task, TSDB_MSG_TYPE_RES_READY, (char *)&rsp, sizeof(rsp), 0); @@ -219,6 +292,46 @@ TEST(queryTest, normalCase) { } + + +TEST(insertTest, normalCase) { + void *mockPointer = (void *)0x1; + char *clusterId = "cluster1"; + char *dbname = "1.db1"; + char *tablename = "table1"; + SVgroupInfo vgInfo = {0}; + SQueryDag dag = {0}; + uint64_t numOfRows = 0; + SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr)); + + SEpAddr qnodeAddr = {0}; + strcpy(qnodeAddr.fqdn, "qnode0.ep"); + qnodeAddr.port = 6031; + taosArrayPush(qnodeList, &qnodeAddr); + + int32_t code = schedulerInit(NULL); + ASSERT_EQ(code, 0); + + schtBuildInsertDag(&dag); + + schtSetPlanToString(); + + pthread_attr_t thattr; + pthread_attr_init(&thattr); + + pthread_t thread1; + pthread_create(&(thread1), &thattr, schtSendRsp, &pInsertJob); + + code = scheduleExecJob(mockPointer, qnodeList, &dag, &pInsertJob, &numOfRows); + ASSERT_EQ(code, 0); + ASSERT_EQ(numOfRows, 20); + + scheduleFreeJob(pInsertJob); +} + + + + int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS();