diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index 592c5f707c..d64b1518bd 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -137,7 +137,7 @@ int32_t catalogGetTableHashVgroup(struct SCatalog* pCatalog, void * pTransporter int32_t catalogGetAllMeta(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const SCatalogReq* pReq, SMetaData* pRsp); -int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet); +int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList); diff --git a/include/libs/scheduler/scheduler.h b/include/libs/scheduler/scheduler.h index 975b103538..fa1483de87 100644 --- a/include/libs/scheduler/scheduler.h +++ b/include/libs/scheduler/scheduler.h @@ -59,7 +59,15 @@ int32_t schedulerInit(SSchedulerCfg *cfg); * @param qnodeList Qnode address list, element is SEpAddr * @return */ -int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob); +int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, uint64_t *numOfRows); + +/** + * Process the query job, generated according to the query physical plan. + * This is a asynchronized API, and is also thread-safety. + * @param qnodeList Qnode address list, element is SEpAddr + * @return + */ +int32_t scheduleAsyncExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob); int32_t scheduleFetchRows(void *pJob, void **data); @@ -79,4 +87,4 @@ void schedulerDestroy(void); } #endif -#endif /*_TD_SCHEDULER_H_*/ \ No newline at end of file +#endif /*_TD_SCHEDULER_H_*/ diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index bc7a8b8aff..02e7558145 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -634,8 +634,8 @@ _return: CTG_RET(code); } -int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SEpSet* pQnodeEpSet) { - if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pQnodeEpSet) { +int32_t catalogGetQnodeList(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, SArray* pQnodeList) { + if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pQnodeList) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index 22b5b7adc4..fa63c78cbe 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -53,6 +53,12 @@ typedef struct STscObj { namespace { +int32_t ctgTestVgNum = 10; +char *ctgTestClusterId = "cluster1"; +char *ctgTestDbname = "1.db1"; +char *ctgTestTablename = "table1"; + + void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) { SCreateDbMsg* pReq = (SCreateDbMsg*)rpcMallocCont(sizeof(SCreateDbMsg)); strcpy(pReq->db, "1.db1"); @@ -91,6 +97,34 @@ void sendCreateDbMsg(void *shandle, SEpSet *pEpSet) { void __rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { SUseDbRsp *rspMsg = NULL; //todo + pRsp->code =0; + pRsp->contLen = sizeof(SUseDbRsp) + ctgTestVgNum * sizeof(SVgroupInfo); + pRsp->pCont = calloc(1, pRsp->contLen); + rspMsg = (SUseDbRsp *)pRsp->pCont; + strcpy(rspMsg->db, ctgTestDbname); + rspMsg->vgVersion = 1; + rspMsg->vgNum = ctgTestVgNum; + rspMsg->hashMethod = 0; + + SVgroupInfo *vg = NULL; + uint32_t hashUnit = UINT32_MAX / ctgTestVgNum; + for (int32_t i = 0; i < ctgTestVgNum; ++i) { + vg = &rspMsg->vgroupInfo[i]; + + vg->vgId = i + 1; + vg->hashBegin = i * hashUnit; + vg->hashEnd = hashUnit * (i + 1) - 1; + vg->numOfEps = i % TSDB_MAX_REPLICA + 1; + vg->inUse = i % vg->numOfEps; + for (int32_t n = 0; n < vg->numOfEps; ++n) { + SEpAddrMsg *addr = &vg->epAddr[n]; + strcpy(addr->fqdn, "a0"); + addr->port = n + 22; + } + } + + vg->hashEnd = UINT32_MAX; + return; } @@ -112,30 +146,24 @@ void initTestEnv() { } TEST(testCase, normalCase) { - STscObj* pConn = (STscObj *)taos_connect("127.0.0.1", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - char *clusterId = "cluster1"; - char *dbname = "1.db1"; - char *tablename = "table1"; struct SCatalog* pCtg = NULL; void *mockPointer = (void *)0x1; SVgroupInfo vgInfo = {0}; + initTestEnv(); + initQueryModuleMsgHandle(); - sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + //sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); int32_t code = catalogInit(NULL); ASSERT_EQ(code, 0); - code = catalogGetHandle(clusterId, &pCtg); + code = catalogGetHandle(ctgTestClusterId, &pCtg); ASSERT_EQ(code, 0); - code = catalogGetTableHashVgroup(pCtg, pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet, dbname, tablename, &vgInfo); + code = catalogGetTableHashVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestTablename, &vgInfo); ASSERT_EQ(code, 0); - - taos_close(pConn); } diff --git a/source/libs/qworker/CMakeLists.txt b/source/libs/qworker/CMakeLists.txt index 4eafa50bdc..001756d7c3 100644 --- a/source/libs/qworker/CMakeLists.txt +++ b/source/libs/qworker/CMakeLists.txt @@ -10,3 +10,5 @@ target_link_libraries( qworker PRIVATE os util transport planner qcom ) + +ADD_SUBDIRECTORY(test) \ No newline at end of file diff --git a/source/libs/qworker/test/CMakeLists.txt b/source/libs/qworker/test/CMakeLists.txt new file mode 100644 index 0000000000..6fb5e4d5c0 --- /dev/null +++ b/source/libs/qworker/test/CMakeLists.txt @@ -0,0 +1,18 @@ + +MESSAGE(STATUS "build qworker unit test") + +# GoogleTest requires at least C++11 +SET(CMAKE_CXX_STANDARD 11) +AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) + +ADD_EXECUTABLE(qworkerTest ${SOURCE_LIST}) +TARGET_LINK_LIBRARIES( + qworkerTest + PUBLIC os util common transport gtest qcom +) + +TARGET_INCLUDE_DIRECTORIES( + qworkerTest + PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/qworker/" + PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/qworker/inc" +) diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp new file mode 100644 index 0000000000..fb1ba5c43f --- /dev/null +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -0,0 +1,71 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#pragma GCC diagnostic ignored "-Wwrite-strings" + +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" +#include "os.h" + +#include "taos.h" +#include "tdef.h" +#include "tvariant.h" +#include "tep.h" +#include "trpc.h" +#include "stub.h" +#include "addr_any.h" + + +namespace { + + +} + +void __rpcSendRecv(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + SUseDbRsp *rspMsg = NULL; //todo + + return; +} + + +void initTestEnv() { + static Stub stub; + stub.set(rpcSendRecv, __rpcSendRecv); + { + AddrAny any("libtransport.so"); + std::map result; + any.get_global_func_addr_dynsym("^rpcSendRecv$", result); + for (const auto& f : result) { + stub.set(f.second, __rpcSendRecv); + } + } +} + +TEST(testCase, normalCase) { + +} + + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + + + diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index bc7bc44350..03465428c0 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -68,11 +68,17 @@ typedef struct SQueryTask { SArray *parents; // the data destination tasks, get data from current task, element is SQueryTask* } SQueryTask; +typedef struct SQueryJobAttr { + bool needFetch; + bool syncQuery; +} SQueryJobAttr; + typedef struct SQueryJob { uint64_t queryId; int32_t levelNum; int32_t levelIdx; int8_t status; + SQueryJobAttr attr; SQueryProfileSummary summary; SEpSet dataSrcEps; SEpAddr resEp; @@ -81,8 +87,10 @@ typedef struct SQueryJob { tsem_t rspSem; int32_t userFetch; int32_t remoteFetch; - void *res; + void *res; + int32_t resNumOfRows; + SHashObj *execTasks; // executing tasks, key:taskid, value:SQueryTask* SHashObj *succTasks; // succeed tasks, key:taskid, value:SQueryTask* SHashObj *failTasks; // failed tasks, key:taskid, value:SQueryTask* diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 4185b3176c..6664d17c7f 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -152,6 +152,8 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { SCH_ERR_JRET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + job->attr.needFetch = true; + job->levelNum = levelNum; job->levelIdx = levelNum - 1; @@ -196,6 +198,10 @@ int32_t schValidateAndBuildJob(SQueryDag *dag, SQueryJob *job) { for (int32_t n = 0; n < levelPlanNum; ++n) { SSubplan *plan = taosArrayGet(levelPlans, n); SQueryTask task = {0}; + + if (plan->type == QUERY_TYPE_MODIFY) { + job->attr.needFetch = false; + } task.taskId = atomic_add_fetch_64(&schMgmt.taskId, 1); task.plan = plan; @@ -424,8 +430,12 @@ _return: } -int32_t schProcessOnJobSuccess(SQueryJob *job) { - job->status = JOB_TASK_STATUS_SUCCEED; +int32_t schProcessOnJobPartialSuccess(SQueryJob *job) { + job->status = JOB_TASK_STATUS_PARTIAL_SUCCEED; + + if ((!job->attr.needFetch) && job->attr.syncQuery) { + tsem_post(&job->rspSem); + } if (job->userFetch) { SCH_ERR_RET(schFetchFromRemote(job)); @@ -495,7 +505,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { job->resEp.port = task->execAddr.port; } - SCH_ERR_RET(schProcessOnJobSuccess(job)); + SCH_ERR_RET(schProcessOnJobPartialSuccess(job)); return TSDB_CODE_SUCCESS; } @@ -560,34 +570,63 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod return TSDB_CODE_SUCCESS; } -int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, int32_t rspCode) { +int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode) { int32_t code = 0; switch (msgType) { - case TSDB_MSG_TYPE_QUERY: - if (rspCode != TSDB_CODE_SUCCESS) { - SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); - } else { - code = schAsyncSendMsg(job, task, TSDB_MSG_TYPE_RES_READY); - if (code) { - goto _task_error; + case TSDB_MSG_TYPE_SUBMIT: { + SShellSubmitRspMsg *rsp = (SShellSubmitRspMsg *)msg; + if (rsp->code != TSDB_CODE_SUCCESS) { + SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code)); + } else { + job->resNumOfRows += rsp->numOfRows; + + code = schProcessOnTaskSuccess(job, task); + if (code) { + goto _task_error; + } } + break; } - break; - case TSDB_MSG_TYPE_RES_READY: - if (rspCode != TSDB_CODE_SUCCESS) { - SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rspCode)); - } else { - code = schProcessOnTaskSuccess(job, task); - if (code) { - goto _task_error; - } + case TSDB_MSG_TYPE_QUERY: { + SQueryTableRsp *rsp = (SQueryTableRsp *)msg; + + if (rsp->code != TSDB_CODE_SUCCESS) { + SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code)); + } else { + code = schAsyncSendMsg(job, task, TSDB_MSG_TYPE_RES_READY); + if (code) { + goto _task_error; + } + } + break; + } + case TSDB_MSG_TYPE_RES_READY: { + SResReadyRsp *rsp = (SResReadyRsp *)msg; + + if (rsp->code != TSDB_CODE_SUCCESS) { + SCH_ERR_JRET(schProcessOnTaskFailure(job, task, rsp->code)); + } else { + code = schProcessOnTaskSuccess(job, task); + if (code) { + goto _task_error; + } + } + break; + } + case TSDB_MSG_TYPE_FETCH: { + SCH_ERR_JRET(rspCode); + SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; + if (rsp->completed) { + job->status = JOB_TASK_STATUS_SUCCEED; + } + + job->res = rsp; + job->resNumOfRows = rsp->numOfRows; + + SCH_ERR_JRET(schProcessOnDataFetched(job)); + break; } - break; - case TSDB_MSG_TYPE_FETCH: - SCH_ERR_JRET(rspCode); - SCH_ERR_JRET(schProcessOnDataFetched(job)); - break; default: qError("unknown msg type:%d received", msgType); return TSDB_CODE_QRY_INVALID_INPUT; @@ -680,7 +719,7 @@ int32_t schedulerInit(SSchedulerCfg *cfg) { } -int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob) { +int32_t scheduleExecJobImpl(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, bool syncQuery) { if (NULL == transport || NULL == transport ||NULL == pDag || NULL == pDag->pSubplans || NULL == pJob) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -695,6 +734,7 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } + job->attr.syncQuery = syncQuery; job->transport = transport; job->qnodeList = qnodeList; @@ -719,10 +759,16 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi } tsem_init(&job->rspSem, 0, 0); - - if (0 != taosHashPut(schMgmt.jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES)) { - qError("taosHashPut queryId:%"PRIx64" failed", job->queryId); - SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + + code = taosHashPut(schMgmt.jobs, &job->queryId, sizeof(job->queryId), &job, POINTER_BYTES); + if (0 != code) { + if (HASH_NODE_EXIST(code)) { + qError("taosHashPut queryId:%"PRIx64" already exist", job->queryId); + SCH_ERR_JRET(TSDB_CODE_QRY_INVALID_INPUT); + } else { + qError("taosHashPut queryId:%"PRIx64" failed", job->queryId); + SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR); + } } job->status = JOB_TASK_STATUS_NOT_START; @@ -731,6 +777,10 @@ int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, voi *(SQueryJob **)pJob = job; + if (syncQuery) { + tsem_wait(&job->rspSem); + } + return TSDB_CODE_SUCCESS; _return: @@ -741,20 +791,47 @@ _return: SCH_RET(code); } +int32_t scheduleExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob, uint64_t *numOfRows) { + *numOfRows = 0; + + SCH_ERR_RET(scheduleExecJobImpl(transport, qnodeList, pDag, pJob, true)); + + SQueryJob *job = *(SQueryJob **)pJob; + + *numOfRows = job->resNumOfRows; + + return TSDB_CODE_SUCCESS; +} + +int32_t scheduleAsyncExecJob(void *transport, SArray *qnodeList, SQueryDag* pDag, void** pJob) { + return scheduleExecJobImpl(transport, qnodeList, pDag, pJob, false); +} + + int32_t scheduleFetchRows(void *pJob, void **data) { if (NULL == pJob || NULL == data) { - return TSDB_CODE_QRY_INVALID_INPUT; + SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } SQueryJob *job = pJob; int32_t code = 0; - if (atomic_val_compare_exchange_32(&job->userFetch, 0, 1) != 0) { - qError("prior fetching not finished"); - return TSDB_CODE_QRY_APP_ERROR; + if (!job->attr.needFetch) { + qError("no need to fetch data"); + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } if (job->status == JOB_TASK_STATUS_SUCCEED) { + job->res = NULL; + return TSDB_CODE_SUCCESS; + } + + if (atomic_val_compare_exchange_32(&job->userFetch, 0, 1) != 0) { + qError("prior fetching not finished"); + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + + if (job->status == JOB_TASK_STATUS_PARTIAL_SUCCEED) { SCH_ERR_JRET(schFetchFromRemote(job)); } @@ -766,7 +843,7 @@ int32_t scheduleFetchRows(void *pJob, void **data) { _return: atomic_val_compare_exchange_32(&job->userFetch, 1, 0); - return code; + SCH_RET(code); } int32_t scheduleCancelJob(void *pJob) { diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 9e94553058..de58862cea 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -26,72 +26,72 @@ #include "taos.h" #include "tdef.h" #include "tvariant.h" -#include "catalog.h" -#include "scheduler.h" +#include "catalog.h" +#include "scheduler.h" #include "tep.h" #include "trpc.h" - + namespace { -void mockBuildDag(SQueryDag *dag) { - uint64_t qId = 0x111111111111; - - dag->queryId = qId; - dag->numOfSubplans = 2; - dag->pSubplans = taosArrayInit(dag->numOfSubplans, POINTER_BYTES); - SArray *scan = taosArrayInit(1, sizeof(SSubplan)); - SArray *merge = taosArrayInit(1, sizeof(SSubplan)); - - SSubplan scanPlan = {0}; - SSubplan mergePlan = {0}; - - scanPlan.id.queryId = qId; - scanPlan.id.templateId = 0x2222222222; - scanPlan.id.subplanId = 0x3333333333; - scanPlan.type = QUERY_TYPE_SCAN; - scanPlan.level = 1; - scanPlan.execEpSet.numOfEps = 1; - scanPlan.pChildern = NULL; - scanPlan.pParents = taosArrayInit(1, POINTER_BYTES); - - mergePlan.id.queryId = qId; - mergePlan.id.templateId = 0x4444444444; - mergePlan.id.subplanId = 0x5555555555; - mergePlan.type = QUERY_TYPE_MERGE; - mergePlan.level = 0; - mergePlan.execEpSet.numOfEps = 1; - mergePlan.pChildern = taosArrayInit(1, POINTER_BYTES); - mergePlan.pParents = NULL; - - SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan); - SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan); - - taosArrayPush(mergePointer->pChildern, &scanPointer); - taosArrayPush(scanPointer->pParents, &mergePointer); - - taosArrayPush(dag->pSubplans, &merge); - taosArrayPush(dag->pSubplans, &scan); -} - +void mockBuildDag(SQueryDag *dag) { + uint64_t qId = 0x111111111111; + + dag->queryId = qId; + dag->numOfSubplans = 2; + dag->pSubplans = taosArrayInit(dag->numOfSubplans, POINTER_BYTES); + SArray *scan = taosArrayInit(1, sizeof(SSubplan)); + SArray *merge = taosArrayInit(1, sizeof(SSubplan)); + + SSubplan scanPlan = {0}; + SSubplan mergePlan = {0}; + + scanPlan.id.queryId = qId; + scanPlan.id.templateId = 0x2222222222; + scanPlan.id.subplanId = 0x3333333333; + scanPlan.type = QUERY_TYPE_SCAN; + scanPlan.level = 1; + scanPlan.execEpSet.numOfEps = 1; + scanPlan.pChildern = NULL; + scanPlan.pParents = taosArrayInit(1, POINTER_BYTES); + + mergePlan.id.queryId = qId; + mergePlan.id.templateId = 0x4444444444; + mergePlan.id.subplanId = 0x5555555555; + mergePlan.type = QUERY_TYPE_MERGE; + mergePlan.level = 0; + mergePlan.execEpSet.numOfEps = 1; + mergePlan.pChildern = taosArrayInit(1, POINTER_BYTES); + mergePlan.pParents = NULL; + + SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan); + SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan); + + taosArrayPush(mergePointer->pChildern, &scanPointer); + taosArrayPush(scanPointer->pParents, &mergePointer); + + taosArrayPush(dag->pSubplans, &merge); + taosArrayPush(dag->pSubplans, &scan); +} + } TEST(testCase, normalCase) { - void *mockPointer = (void *)0x1; + void *mockPointer = (void *)0x1; char *clusterId = "cluster1"; char *dbname = "1.db1"; char *tablename = "table1"; - SVgroupInfo vgInfo = {0}; - void *pJob = NULL; - SQueryDag dag = {0}; - SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr)); - - int32_t code = schedulerInit(NULL); + SVgroupInfo vgInfo = {0}; + void *pJob = NULL; + SQueryDag dag = {0}; + SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr)); + + int32_t code = schedulerInit(NULL); ASSERT_EQ(code, 0); - - mockBuildDag(&dag); - - code = scheduleExecJob(mockPointer, qnodeList, &dag, &pJob); + + mockBuildDag(&dag); + + code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &pJob); ASSERT_EQ(code, 0); -} +} int main(int argc, char** argv) { @@ -101,4 +101,4 @@ int main(int argc, char** argv) { - +