From 13d440677441e7ecc14aad3bf1cd01af27077a58 Mon Sep 17 00:00:00 2001 From: dapan Date: Sat, 25 Dec 2021 21:29:42 +0800 Subject: [PATCH] catalog ut --- include/libs/catalog/catalog.h | 2 +- source/libs/catalog/src/catalog.c | 28 ++- source/libs/catalog/test/catalogTests.cpp | 160 +++++++++++++++--- source/libs/scheduler/inc/schedulerInt.h | 2 + source/libs/scheduler/src/scheduler.c | 59 ++++--- source/libs/scheduler/test/schedulerTests.cpp | 139 ++++++++++++++- 6 files changed, 335 insertions(+), 55 deletions(-) diff --git a/include/libs/catalog/catalog.h b/include/libs/catalog/catalog.h index d64b1518bd..cb699d83bc 100644 --- a/include/libs/catalog/catalog.h +++ b/include/libs/catalog/catalog.h @@ -110,7 +110,7 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const * @param pVgroupList (output, vgroup info list, element is SVgroupInfo, NEED to simply free the array by caller) * @return error code */ -int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList); +int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray** pVgroupList); /** * Get a table's vgroup from its name's hash value. diff --git a/source/libs/catalog/src/catalog.c b/source/libs/catalog/src/catalog.c index 4f03ff1ef7..6efb7032ec 100644 --- a/source/libs/catalog/src/catalog.c +++ b/source/libs/catalog/src/catalog.c @@ -197,15 +197,21 @@ int32_t ctgGetHashFunction(int8_t hashMethod, tableNameHashFp *fp) { return TSDB_CODE_SUCCESS; } -int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SDBVgroupInfo *dbInfo, SArray* vgroupList) { +int32_t ctgGetVgInfoFromDB(struct SCatalog *pCatalog, void *pRpc, const SEpSet *pMgmtEps, SDBVgroupInfo *dbInfo, SArray** vgroupList) { SHashObj *vgroupHash = NULL; SVgroupInfo *vgInfo = NULL; + *vgroupList = taosArrayInit(taosHashGetSize(dbInfo->vgInfo), sizeof(SVgroupInfo)); + if (NULL == *vgroupList) { + ctgError("taosArrayInit failed"); + CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); + } + void *pIter = taosHashIterate(dbInfo->vgInfo, NULL); while (pIter) { vgInfo = pIter; - if (NULL == taosArrayPush(vgroupList, vgInfo)) { + if (NULL == taosArrayPush(*vgroupList, vgInfo)) { ctgError("taosArrayPush failed"); CTG_ERR_RET(TSDB_CODE_CTG_MEM_ERROR); } @@ -522,7 +528,7 @@ int32_t catalogRenewAndGetTableMeta(struct SCatalog* pCatalog, void *pRpc, const return ctgGetTableMetaImpl(pCatalog, pRpc, pMgmtEps, pDBName, pTableName, true, pTableMeta); } -int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray* pVgroupList) { +int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const SEpSet* pMgmtEps, const char* pDBName, const char* pTableName, SArray** pVgroupList) { if (NULL == pCatalog || NULL == pRpc || NULL == pMgmtEps || NULL == pDBName || NULL == pTableName || NULL == pVgroupList) { CTG_ERR_RET(TSDB_CODE_CTG_INVALID_INPUT); } @@ -542,17 +548,29 @@ int32_t catalogGetTableDistVgroup(struct SCatalog* pCatalog, void *pRpc, const S int32_t vgId = tbMeta->vgId; if (NULL == taosHashGetClone(dbVgroup.vgInfo, &vgId, sizeof(vgId), &vgroupInfo)) { ctgError("vgId[%d] not found in vgroup list", vgId); - CTG_ERR_RET(TSDB_CODE_CTG_INTERNAL_ERROR); + CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } - if (NULL == taosArrayPush(pVgroupList, &vgroupInfo)) { + *pVgroupList = taosArrayInit(1, sizeof(SVgroupInfo)); + if (NULL == *pVgroupList) { + ctgError("taosArrayInit failed"); + CTG_ERR_JRET(TSDB_CODE_CTG_MEM_ERROR); + } + + if (NULL == taosArrayPush(*pVgroupList, &vgroupInfo)) { ctgError("push vgroupInfo to array failed"); CTG_ERR_JRET(TSDB_CODE_CTG_INTERNAL_ERROR); } } + tfree(tbMeta); + + return TSDB_CODE_SUCCESS; + _return: tfree(tbMeta); + + taosArrayDestroy(*pVgroupList); CTG_RET(code); } diff --git a/source/libs/catalog/test/catalogTests.cpp b/source/libs/catalog/test/catalogTests.cpp index 66e1e0ab74..d0ca4b9611 100644 --- a/source/libs/catalog/test/catalogTests.cpp +++ b/source/libs/catalog/test/catalogTests.cpp @@ -36,6 +36,7 @@ namespace { +void ctgTestSetPrepareTableMeta(); void ctgTestSetPrepareCTableMeta(); void ctgTestSetPrepareSTableMeta(); @@ -122,21 +123,7 @@ void ctgTestPrepareDbVgroups(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcM return; } -void ctgTestPrepareDbVgroupsAndChildMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { - ctgTestPrepareDbVgroups(shandle, pEpSet, pMsg, pRsp); - - ctgTestSetPrepareCTableMeta(); - - return; -} -void ctgTestPrepareDbVgroupsAndSuperMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { - ctgTestPrepareDbVgroups(shandle, pEpSet, pMsg, pRsp); - - ctgTestSetPrepareSTableMeta(); - - return; -} void ctgTestPrepareTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { @@ -262,6 +249,32 @@ void ctgTestPrepareSTableMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpc return; } +void ctgTestPrepareDbVgroupsAndNormalMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + ctgTestPrepareDbVgroups(shandle, pEpSet, pMsg, pRsp); + + ctgTestSetPrepareTableMeta(); + + return; +} + + +void ctgTestPrepareDbVgroupsAndChildMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + ctgTestPrepareDbVgroups(shandle, pEpSet, pMsg, pRsp); + + ctgTestSetPrepareCTableMeta(); + + return; +} + +void ctgTestPrepareDbVgroupsAndSuperMeta(void *shandle, SEpSet *pEpSet, SRpcMsg *pMsg, SRpcMsg *pRsp) { + ctgTestPrepareDbVgroups(shandle, pEpSet, pMsg, pRsp); + + ctgTestSetPrepareSTableMeta(); + + return; +} + + void ctgTestSetPrepareDbVgroups() { static Stub stub; @@ -315,6 +328,19 @@ void ctgTestSetPrepareSTableMeta() { } } +void ctgTestSetPrepareDbVgroupsAndNormalMeta() { + static Stub stub; + stub.set(rpcSendRecv, ctgTestPrepareDbVgroupsAndNormalMeta); + { + AddrAny any("libtransport.so"); + std::map result; + any.get_global_func_addr_dynsym("^rpcSendRecv$", result); + for (const auto& f : result) { + stub.set(f.second, ctgTestPrepareDbVgroupsAndNormalMeta); + } + } +} + void ctgTestSetPrepareDbVgroupsAndChildMeta() { static Stub stub; @@ -324,7 +350,7 @@ void ctgTestSetPrepareDbVgroupsAndChildMeta() { std::map result; any.get_global_func_addr_dynsym("^rpcSendRecv$", result); for (const auto& f : result) { - stub.set(f.second, (rpcSendRecv, ctgTestPrepareDbVgroupsAndChildMeta)); + stub.set(f.second, ctgTestPrepareDbVgroupsAndChildMeta); } } } @@ -337,7 +363,7 @@ void ctgTestSetPrepareDbVgroupsAndSuperMeta() { std::map result; any.get_global_func_addr_dynsym("^rpcSendRecv$", result); for (const auto& f : result) { - stub.set(f.second, (rpcSendRecv, ctgTestPrepareDbVgroupsAndSuperMeta)); + stub.set(f.second, ctgTestPrepareDbVgroupsAndSuperMeta); } } } @@ -345,7 +371,7 @@ void ctgTestSetPrepareDbVgroupsAndSuperMeta() { } -TEST(testCase, normalTableCase) { +TEST(tableMeta, normalTable) { struct SCatalog* pCtg = NULL; void *mockPointer = (void *)0x1; SVgroupInfo vgInfo = {0}; @@ -396,7 +422,7 @@ TEST(testCase, normalTableCase) { catalogDestroy(); } -TEST(testCase, childTableCase) { +TEST(tableMeta, childTableCase) { struct SCatalog* pCtg = NULL; void *mockPointer = (void *)0x1; SVgroupInfo vgInfo = {0}; @@ -452,7 +478,7 @@ TEST(testCase, childTableCase) { catalogDestroy(); } -TEST(testCase, superTableCase) { +TEST(tableMeta, superTableCase) { struct SCatalog* pCtg = NULL; void *mockPointer = (void *)0x1; SVgroupInfo vgInfo = {0}; @@ -496,7 +522,7 @@ TEST(testCase, superTableCase) { ASSERT_EQ(tableMeta->tableInfo.rowSize, 12); tableMeta = NULL; - code = catalogGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestCTablename, &tableMeta); + code = catalogRenewAndGetTableMeta(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestCTablename, &tableMeta); ASSERT_EQ(code, 0); ASSERT_EQ(tableMeta->vgId, 9); ASSERT_EQ(tableMeta->tableType, TSDB_CHILD_TABLE); @@ -512,6 +538,100 @@ TEST(testCase, superTableCase) { catalogDestroy(); } +TEST(tableDistVgroup, normalTable) { + struct SCatalog* pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo *vgInfo = NULL; + SArray *vgList = NULL; + + ctgTestSetPrepareDbVgroupsAndNormalMeta(); + + initQueryModuleMsgHandle(); + + //sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + + code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestTablename, &vgList); + ASSERT_EQ(code, 0); + ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1); + vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); + ASSERT_EQ(vgInfo->vgId, 8); + ASSERT_EQ(vgInfo->numOfEps, 3); + + catalogDestroy(); +} + +TEST(tableDistVgroup, childTableCase) { + struct SCatalog* pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo *vgInfo = NULL; + SArray *vgList = NULL; + + ctgTestSetPrepareDbVgroupsAndChildMeta(); + + initQueryModuleMsgHandle(); + + //sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestCTablename, &vgList); + ASSERT_EQ(code, 0); + ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 1); + vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); + ASSERT_EQ(vgInfo->vgId, 9); + ASSERT_EQ(vgInfo->numOfEps, 4); + + + catalogDestroy(); +} + +TEST(tableDistVgroup, superTableCase) { + struct SCatalog* pCtg = NULL; + void *mockPointer = (void *)0x1; + SVgroupInfo *vgInfo = NULL; + SArray *vgList = NULL; + + ctgTestSetPrepareDbVgroupsAndSuperMeta(); + + initQueryModuleMsgHandle(); + + //sendCreateDbMsg(pConn->pTransporter, &pConn->pAppInfo->mgmtEp.epSet); + + int32_t code = catalogInit(NULL); + ASSERT_EQ(code, 0); + + code = catalogGetHandle(ctgTestClusterId, &pCtg); + ASSERT_EQ(code, 0); + + code = catalogGetTableDistVgroup(pCtg, mockPointer, (const SEpSet *)mockPointer, ctgTestDbname, ctgTestSTablename, &vgList); + ASSERT_EQ(code, 0); + ASSERT_EQ(taosArrayGetSize((const SArray *)vgList), 10); + vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 0); + ASSERT_EQ(vgInfo->vgId, 1); + ASSERT_EQ(vgInfo->numOfEps, 1); + vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 1); + ASSERT_EQ(vgInfo->vgId, 2); + ASSERT_EQ(vgInfo->numOfEps, 2); + vgInfo = (SVgroupInfo *)taosArrayGet(vgList, 2); + ASSERT_EQ(vgInfo->vgId, 3); + ASSERT_EQ(vgInfo->numOfEps, 3); + + + catalogDestroy(); +} + + int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv); diff --git a/source/libs/scheduler/inc/schedulerInt.h b/source/libs/scheduler/inc/schedulerInt.h index 03465428c0..b1c3f83e9c 100644 --- a/source/libs/scheduler/inc/schedulerInt.h +++ b/source/libs/scheduler/inc/schedulerInt.h @@ -88,6 +88,8 @@ typedef struct SQueryJob { int32_t userFetch; int32_t remoteFetch; + SQueryTask *fetchTask; + int32_t errCode; void *res; int32_t resNumOfRows; diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 6664d17c7f..17d3d800f0 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -70,14 +70,14 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { } for (int32_t n = 0; n < childNum; ++n) { - SSubplan *child = taosArrayGet(plan->pChildern, n); - SQueryTask *childTask = taosHashGet(planToTask, &child, POINTER_BYTES); - if (childTask) { + SSubplan **child = taosArrayGet(plan->pChildern, n); + SQueryTask **childTask = taosHashGet(planToTask, child, POINTER_BYTES); + if (NULL == childTask || NULL == *childTask) { qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - if (NULL == taosArrayPush(task->children, &childTask)) { + if (NULL == taosArrayPush(task->children, childTask)) { qError("taosArrayPush failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -92,14 +92,14 @@ int32_t schBuildTaskRalation(SQueryJob *job, SHashObj *planToTask) { } for (int32_t n = 0; n < parentNum; ++n) { - SSubplan *parent = taosArrayGet(plan->pParents, n); - SQueryTask *parentTask = taosHashGet(planToTask, &parent, POINTER_BYTES); - if (parentTask) { + SSubplan **parent = taosArrayGet(plan->pParents, n); + SQueryTask **parentTask = taosHashGet(planToTask, parent, POINTER_BYTES); + if (NULL == parentTask || NULL == *parentTask) { qError("subplan relationship error, level:%d, taskIdx:%d, childIdx:%d", i, m, n); SCH_ERR_RET(TSDB_CODE_SCH_INTERNAL_ERROR); } - if (NULL == taosArrayPush(task->parents, &parentTask)) { + if (NULL == taosArrayPush(task->parents, parentTask)) { qError("taosArrayPush failed"); SCH_ERR_RET(TSDB_CODE_QRY_OUT_OF_MEMORY); } @@ -296,8 +296,7 @@ int32_t schMoveTaskToSuccList(SQueryJob *job, SQueryTask *task, bool *moved) { int32_t schMoveTaskToFailList(SQueryJob *job, SQueryTask *task, bool *moved) { if (0 != taosHashRemove(job->execTasks, &task->taskId, sizeof(task->taskId))) { - qWarn("remove task[%"PRIx64"] from execTasks failed", task->taskId); - return TSDB_CODE_SUCCESS; + qWarn("remove task[%"PRIx64"] from execTasks failed, it may not exist", task->taskId); } if (0 != taosHashPut(job->failTasks, &task->taskId, sizeof(task->taskId), &task, POINTER_BYTES)) { @@ -363,6 +362,10 @@ int32_t schAsyncSendMsg(SQueryJob *job, SQueryTask *task, int32_t msgType) { break; } case TSDB_MSG_TYPE_FETCH: { + if (NULL == task) { + SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); + } + msgSize = sizeof(SResFetchMsg); msg = calloc(1, msgSize); if (NULL == msg) { @@ -419,7 +422,7 @@ int32_t schFetchFromRemote(SQueryJob *job) { return TSDB_CODE_SUCCESS; } - SCH_ERR_JRET(schAsyncSendMsg(job, NULL, TSDB_MSG_TYPE_FETCH)); + SCH_ERR_JRET(schAsyncSendMsg(job, job->fetchTask, TSDB_MSG_TYPE_FETCH)); return TSDB_CODE_SUCCESS; @@ -444,8 +447,9 @@ int32_t schProcessOnJobPartialSuccess(SQueryJob *job) { return TSDB_CODE_SUCCESS; } -int32_t schProcessOnJobFailure(SQueryJob *job) { +int32_t schProcessOnJobFailure(SQueryJob *job, int32_t errCode) { job->status = JOB_TASK_STATUS_FAILED; + job->errCode = errCode; atomic_val_compare_exchange_32(&job->remoteFetch, 1, 0); @@ -474,7 +478,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { task->status = JOB_TASK_STATUS_SUCCEED; - int32_t parentNum = (int32_t)taosArrayGetSize(task->parents); + int32_t parentNum = task->parents ? (int32_t)taosArrayGetSize(task->parents) : 0; if (parentNum == 0) { if (task->plan->level != 0) { qError("level error"); @@ -496,7 +500,7 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { if (task->level->taskFailed > 0) { job->status = JOB_TASK_STATUS_FAILED; - SCH_ERR_RET(schProcessOnJobFailure(job)); + SCH_ERR_RET(schProcessOnJobFailure(job, TSDB_CODE_QRY_APP_ERROR)); return TSDB_CODE_SUCCESS; } @@ -505,6 +509,8 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { job->resEp.port = task->execAddr.port; } + job->fetchTask = task; + SCH_ERR_RET(schProcessOnJobPartialSuccess(job)); return TSDB_CODE_SUCCESS; @@ -518,14 +524,14 @@ int32_t schProcessOnTaskSuccess(SQueryJob *job, SQueryTask *task) { } for (int32_t i = 0; i < parentNum; ++i) { - SQueryTask *par = taosArrayGet(task->parents, i); + SQueryTask *par = *(SQueryTask **)taosArrayGet(task->parents, i); ++par->childReady; SCH_ERR_RET(qSetSubplanExecutionNode(par->plan, task->plan->id.templateId, &task->execAddr)); if (SCH_TASK_READY_TO_LUNCH(par)) { - SCH_ERR_RET(schLaunchTask(job, task)); + SCH_ERR_RET(schLaunchTask(job, par)); } } @@ -544,7 +550,6 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod SCH_ERR_RET(schMoveTaskToFailList(job, task, &moved)); if (!moved) { SCH_TASK_ERR_LOG("task may already moved, status:%d", task->status); - return TSDB_CODE_SUCCESS; } if (SCH_TASK_NEED_WAIT_ALL(task)) { @@ -560,7 +565,7 @@ int32_t schProcessOnTaskFailure(SQueryJob *job, SQueryTask *task, int32_t errCod } job->status = JOB_TASK_STATUS_FAILED; - SCH_ERR_RET(schProcessOnJobFailure(job)); + SCH_ERR_RET(schProcessOnJobFailure(job, errCode)); return TSDB_CODE_SUCCESS; } @@ -617,9 +622,6 @@ int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, char case TSDB_MSG_TYPE_FETCH: { SCH_ERR_JRET(rspCode); SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)msg; - if (rsp->completed) { - job->status = JOB_TASK_STATUS_SUCCEED; - } job->res = rsp; job->resNumOfRows = rsp->numOfRows; @@ -639,7 +641,7 @@ _task_error: return TSDB_CODE_SUCCESS; _return: - code = schProcessOnJobFailure(job); + code = schProcessOnJobFailure(job, code); return code; } @@ -821,6 +823,11 @@ int32_t scheduleFetchRows(void *pJob, void **data) { SCH_ERR_RET(TSDB_CODE_QRY_APP_ERROR); } + if (job->status == JOB_TASK_STATUS_FAILED) { + job->res = NULL; + SCH_RET(job->errCode); + } + if (job->status == JOB_TASK_STATUS_SUCCEED) { job->res = NULL; return TSDB_CODE_SUCCESS; @@ -837,6 +844,14 @@ int32_t scheduleFetchRows(void *pJob, void **data) { tsem_wait(&job->rspSem); + if (job->status == JOB_TASK_STATUS_FAILED) { + code = job->errCode; + } + + if (job->res && ((SRetrieveTableRsp *)job->res)->completed) { + job->status = JOB_TASK_STATUS_SUCCEED; + } + *data = job->res; job->res = NULL; diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index de58862cea..6692b4b932 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -30,10 +30,16 @@ #include "scheduler.h" #include "tep.h" #include "trpc.h" +#include "schedulerInt.h" +#include "stub.h" +#include "addr_any.h" namespace { -void mockBuildDag(SQueryDag *dag) { - uint64_t qId = 0x111111111111; + +extern "C" int32_t schHandleRspMsg(SQueryJob *job, SQueryTask *task, int32_t msgType, char *msg, int32_t msgSize, int32_t rspCode); + +void schtBuildDag(SQueryDag *dag) { + uint64_t qId = 0x0000000000000001; dag->queryId = qId; dag->numOfSubplans = 2; @@ -45,22 +51,26 @@ void mockBuildDag(SQueryDag *dag) { SSubplan mergePlan = {0}; scanPlan.id.queryId = qId; - scanPlan.id.templateId = 0x2222222222; - scanPlan.id.subplanId = 0x3333333333; + scanPlan.id.templateId = 0x0000000000000002; + scanPlan.id.subplanId = 0x0000000000000003; scanPlan.type = QUERY_TYPE_SCAN; scanPlan.level = 1; scanPlan.execEpSet.numOfEps = 1; + scanPlan.execEpSet.port[0] = 6030; + strcpy(scanPlan.execEpSet.fqdn[0], "ep0"); scanPlan.pChildern = NULL; scanPlan.pParents = taosArrayInit(1, POINTER_BYTES); + scanPlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode)); mergePlan.id.queryId = qId; mergePlan.id.templateId = 0x4444444444; mergePlan.id.subplanId = 0x5555555555; mergePlan.type = QUERY_TYPE_MERGE; mergePlan.level = 0; - mergePlan.execEpSet.numOfEps = 1; + mergePlan.execEpSet.numOfEps = 0; mergePlan.pChildern = taosArrayInit(1, POINTER_BYTES); mergePlan.pParents = NULL; + mergePlan.pNode = (SPhyNode*)calloc(1, sizeof(SPhyNode)); SSubplan *mergePointer = (SSubplan *)taosArrayPush(merge, &mergePlan); SSubplan *scanPointer = (SSubplan *)taosArrayPush(scan, &scanPlan); @@ -72,9 +82,47 @@ void mockBuildDag(SQueryDag *dag) { taosArrayPush(dag->pSubplans, &scan); } +int32_t schtPlanToString(const SSubplan *subplan, char** str, int32_t* len) { + *str = (char *)calloc(1, 20); + *len = 20; + return 0; } -TEST(testCase, normalCase) { +int32_t schtExecNode(SSubplan* subplan, uint64_t templateId, SEpAddr* ep) { + return 0; +} + + +void schtSetPlanToString() { + static Stub stub; + stub.set(qSubPlanToString, schtPlanToString); + { + AddrAny any("libplanner.so"); + std::map result; + any.get_global_func_addr_dynsym("^qSubPlanToString$", result); + for (const auto& f : result) { + stub.set(f.second, schtPlanToString); + } + } +} + +void schtSetExecNode() { + static Stub stub; + stub.set(qSetSubplanExecutionNode, schtExecNode); + { + AddrAny any("libplanner.so"); + std::map result; + any.get_global_func_addr_dynsym("^qSetSubplanExecutionNode$", result); + for (const auto& f : result) { + stub.set(f.second, schtExecNode); + } + } +} + + +} + +TEST(queryTest, normalCase) { void *mockPointer = (void *)0x1; char *clusterId = "cluster1"; char *dbname = "1.db1"; @@ -83,14 +131,91 @@ TEST(testCase, normalCase) { void *pJob = NULL; SQueryDag dag = {0}; SArray *qnodeList = taosArrayInit(1, sizeof(SEpAddr)); + + SEpAddr qnodeAddr = {0}; + strcpy(qnodeAddr.fqdn, "qnode0.ep"); + qnodeAddr.port = 6031; + taosArrayPush(qnodeList, &qnodeAddr); int32_t code = schedulerInit(NULL); ASSERT_EQ(code, 0); - mockBuildDag(&dag); + schtBuildDag(&dag); + + schtSetPlanToString(); + schtSetExecNode(); code = scheduleAsyncExecJob(mockPointer, qnodeList, &dag, &pJob); ASSERT_EQ(code, 0); + + SQueryJob *job = (SQueryJob *)pJob; + void *pIter = taosHashIterate(job->execTasks, NULL); + while (pIter) { + SQueryTask *task = *(SQueryTask **)pIter; + + SQueryTableRsp rsp = {0}; + code = schHandleRspMsg(job, task, TSDB_MSG_TYPE_QUERY, (char *)&rsp, sizeof(rsp), 0); + + ASSERT_EQ(code, 0); + pIter = taosHashIterate(job->execTasks, pIter); + } + + pIter = taosHashIterate(job->execTasks, NULL); + while (pIter) { + SQueryTask *task = *(SQueryTask **)pIter; + + SResReadyRsp rsp = {0}; + code = schHandleRspMsg(job, task, TSDB_MSG_TYPE_RES_READY, (char *)&rsp, sizeof(rsp), 0); + + ASSERT_EQ(code, 0); + pIter = taosHashIterate(job->execTasks, pIter); + } + + pIter = taosHashIterate(job->execTasks, NULL); + while (pIter) { + SQueryTask *task = *(SQueryTask **)pIter; + + SQueryTableRsp rsp = {0}; + code = schHandleRspMsg(job, task, TSDB_MSG_TYPE_QUERY, (char *)&rsp, sizeof(rsp), 0); + + ASSERT_EQ(code, 0); + pIter = taosHashIterate(job->execTasks, pIter); + } + + pIter = taosHashIterate(job->execTasks, NULL); + while (pIter) { + SQueryTask *task = *(SQueryTask **)pIter; + + SResReadyRsp rsp = {0}; + code = schHandleRspMsg(job, task, TSDB_MSG_TYPE_RES_READY, (char *)&rsp, sizeof(rsp), 0); + ASSERT_EQ(code, 0); + + pIter = taosHashIterate(job->execTasks, pIter); + } + + SRetrieveTableRsp rsp = {0}; + rsp.completed = 1; + rsp.numOfRows = 10; + code = schHandleRspMsg(job, NULL, TSDB_MSG_TYPE_FETCH, (char *)&rsp, sizeof(rsp), 0); + + ASSERT_EQ(code, 0); + + + void *data = NULL; + + code = scheduleFetchRows(job, &data); + ASSERT_EQ(code, 0); + + SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data; + ASSERT_EQ(pRsp->completed, 1); + ASSERT_EQ(pRsp->numOfRows, 10); + + data = NULL; + code = scheduleFetchRows(job, &data); + ASSERT_EQ(code, 0); + ASSERT_EQ(data, (void*)NULL); + + scheduleFreeJob(pJob); }