diff --git a/source/libs/scheduler/src/schJob.c b/source/libs/scheduler/src/schJob.c index 6e312f0e6f..e50ec64d54 100644 --- a/source/libs/scheduler/src/schJob.c +++ b/source/libs/scheduler/src/schJob.c @@ -66,7 +66,7 @@ FORCE_INLINE bool schJobNeedToStop(SSchJob *pJob, int8_t *pStatus) { return true; } - if ((*pJob->chkKillFp)(pJob->chkKillParam)) { + if (pJob->chkKillFp && (*pJob->chkKillFp)(pJob->chkKillParam)) { schUpdateJobErrCode(pJob, TSDB_CODE_TSC_QUERY_KILLED); return true; } diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 5605a4b842..fbb1f657b0 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -54,9 +54,8 @@ namespace { -extern "C" int32_t schHandleResponseMsg(SSchJob *job, SSchTask *task, int32_t msgType, char *msg, int32_t msgSize, - int32_t rspCode); -extern "C" int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t msgType, int32_t rspCode); +extern "C" int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t execId, SDataBuf *pMsg, int32_t rspCode); +extern "C" int32_t schHandleCallback(void *param, const SDataBuf *pMsg, int32_t rspCode); int64_t insertJobRefId = 0; int64_t queryJobRefId = 0; @@ -67,7 +66,7 @@ uint64_t schtQueryId = 1; bool schtTestStop = false; bool schtTestDeadLoop = false; -int32_t schtTestMTRunSec = 10; +int32_t schtTestMTRunSec = 1; int32_t schtTestPrintNum = 1000; int32_t schtStartFetch = 0; @@ -85,10 +84,69 @@ void schtInitLogFile() { } void schtQueryCb(SExecResult *pResult, void *param, int32_t code) { - assert(TSDB_CODE_SUCCESS == code); *(int32_t *)param = 1; } +int32_t schtBuildQueryRspMsg(uint32_t *msize, void** rspMsg) { + SQueryTableRsp rsp = {0}; + rsp.code = 0; + rsp.affectedRows = 0; + rsp.tbVerInfo = NULL; + + int32_t msgSize = tSerializeSQueryTableRsp(NULL, 0, &rsp); + if (msgSize < 0) { + qError("tSerializeSQueryTableRsp failed"); + return TSDB_CODE_OUT_OF_MEMORY; + } + + void *pRsp = taosMemoryCalloc(msgSize, 1); + if (NULL == pRsp) { + qError("rpcMallocCont %d failed", msgSize); + return TSDB_CODE_OUT_OF_MEMORY; + } + + if (tSerializeSQueryTableRsp(pRsp, msgSize, &rsp) < 0) { + qError("tSerializeSQueryTableRsp %d failed", msgSize); + return TSDB_CODE_OUT_OF_MEMORY; + } + + *rspMsg = pRsp; + *msize = msgSize; + + return TSDB_CODE_SUCCESS; +} + + +int32_t schtBuildFetchRspMsg(uint32_t *msize, void** rspMsg) { + SRetrieveTableRsp* rsp = (SRetrieveTableRsp*)taosMemoryCalloc(sizeof(SRetrieveTableRsp), 1); + rsp->completed = 1; + rsp->numOfRows = 10; + rsp->compLen = 0; + + *rspMsg = rsp; + *msize = sizeof(SRetrieveTableRsp); + + return TSDB_CODE_SUCCESS; +} + +int32_t schtBuildSubmitRspMsg(uint32_t *msize, void** rspMsg) { + SSubmitRsp2 submitRsp = {0}; + int32_t msgSize = 0, ret = 0; + SEncoder ec = {0}; + + tEncodeSize(tEncodeSSubmitRsp2, &submitRsp, msgSize, ret); + void* msg = taosMemoryCalloc(1, msgSize); + tEncoderInit(&ec, (uint8_t*)msg, msgSize); + tEncodeSSubmitRsp2(&ec, &submitRsp); + tEncoderClear(&ec); + + *rspMsg = msg; + *msize = msgSize; + + return TSDB_CODE_SUCCESS; +} + + void schtBuildQueryDag(SQueryPlan *dag) { uint64_t qId = schtQueryId; @@ -98,8 +156,8 @@ void schtBuildQueryDag(SQueryPlan *dag) { SNodeListNode *scan = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST); SNodeListNode *merge = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST); - SSubplan *scanPlan = (SSubplan *)taosMemoryCalloc(1, sizeof(SSubplan)); - SSubplan *mergePlan = (SSubplan *)taosMemoryCalloc(1, sizeof(SSubplan)); + SSubplan *scanPlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); + SSubplan *mergePlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); scanPlan->id.queryId = qId; scanPlan->id.groupId = 0x0000000000000002; @@ -113,7 +171,7 @@ void schtBuildQueryDag(SQueryPlan *dag) { scanPlan->pChildren = NULL; scanPlan->level = 1; scanPlan->pParents = nodesMakeList(); - scanPlan->pNode = (SPhysiNode *)taosMemoryCalloc(1, sizeof(SPhysiNode)); + scanPlan->pNode = (SPhysiNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN); scanPlan->msgType = TDMT_SCH_QUERY; mergePlan->id.queryId = qId; @@ -125,7 +183,7 @@ void schtBuildQueryDag(SQueryPlan *dag) { mergePlan->pChildren = nodesMakeList(); mergePlan->pParents = NULL; - mergePlan->pNode = (SPhysiNode *)taosMemoryCalloc(1, sizeof(SPhysiNode)); + mergePlan->pNode = (SPhysiNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_MERGE); mergePlan->msgType = TDMT_SCH_QUERY; merge->pNodeList = nodesMakeList(); @@ -151,8 +209,7 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) { SNodeListNode *scan = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST); SNodeListNode *merge = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST); - SSubplan *scanPlan = (SSubplan *)taosMemoryCalloc(scanPlanNum, sizeof(SSubplan)); - SSubplan *mergePlan = (SSubplan *)taosMemoryCalloc(1, sizeof(SSubplan)); + SSubplan *mergePlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); merge->pNodeList = nodesMakeList(); scan->pNodeList = nodesMakeList(); @@ -160,29 +217,30 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) { mergePlan->pChildren = nodesMakeList(); for (int32_t i = 0; i < scanPlanNum; ++i) { - scanPlan[i].id.queryId = qId; - scanPlan[i].id.groupId = 0x0000000000000002; - scanPlan[i].id.subplanId = 0x0000000000000003 + i; - scanPlan[i].subplanType = SUBPLAN_TYPE_SCAN; + SSubplan *scanPlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); + scanPlan->id.queryId = qId; + scanPlan->id.groupId = 0x0000000000000002; + scanPlan->id.subplanId = 0x0000000000000003 + i; + scanPlan->subplanType = SUBPLAN_TYPE_SCAN; - scanPlan[i].execNode.nodeId = 1 + i; - scanPlan[i].execNode.epSet.inUse = 0; - scanPlan[i].execNodeStat.tableNum = taosRand() % 30; - addEpIntoEpSet(&scanPlan[i].execNode.epSet, "ep0", 6030); - addEpIntoEpSet(&scanPlan[i].execNode.epSet, "ep1", 6030); - addEpIntoEpSet(&scanPlan[i].execNode.epSet, "ep2", 6030); - scanPlan[i].execNode.epSet.inUse = taosRand() % 3; + scanPlan->execNode.nodeId = 1 + i; + scanPlan->execNode.epSet.inUse = 0; + scanPlan->execNodeStat.tableNum = taosRand() % 30; + addEpIntoEpSet(&scanPlan->execNode.epSet, "ep0", 6030); + addEpIntoEpSet(&scanPlan->execNode.epSet, "ep1", 6030); + addEpIntoEpSet(&scanPlan->execNode.epSet, "ep2", 6030); + scanPlan->execNode.epSet.inUse = taosRand() % 3; - scanPlan[i].pChildren = NULL; - scanPlan[i].level = 1; - scanPlan[i].pParents = nodesMakeList(); - scanPlan[i].pNode = (SPhysiNode *)taosMemoryCalloc(1, sizeof(SPhysiNode)); - scanPlan[i].msgType = TDMT_SCH_QUERY; + scanPlan->pChildren = NULL; + scanPlan->level = 1; + scanPlan->pParents = nodesMakeList(); + scanPlan->pNode = (SPhysiNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN); + scanPlan->msgType = TDMT_SCH_QUERY; - nodesListAppend(scanPlan[i].pParents, (SNode *)mergePlan); - nodesListAppend(mergePlan->pChildren, (SNode *)(scanPlan + i)); + nodesListAppend(scanPlan->pParents, (SNode *)mergePlan); + nodesListAppend(mergePlan->pChildren, (SNode *)scanPlan); - nodesListAppend(scan->pNodeList, (SNode *)(scanPlan + i)); + nodesListAppend(scan->pNodeList, (SNode *)scanPlan); } mergePlan->id.queryId = qId; @@ -193,7 +251,7 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) { mergePlan->execNode.epSet.numOfEps = 0; mergePlan->pParents = NULL; - mergePlan->pNode = (SPhysiNode *)taosMemoryCalloc(1, sizeof(SPhysiNode)); + mergePlan->pNode = (SPhysiNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_MERGE); mergePlan->msgType = TDMT_SCH_QUERY; nodesListAppend(merge->pNodeList, (SNode *)mergePlan); @@ -211,45 +269,50 @@ void schtBuildInsertDag(SQueryPlan *dag) { dag->numOfSubplans = 2; dag->pSubplans = nodesMakeList(); SNodeListNode *inserta = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST); - - SSubplan *insertPlan = (SSubplan *)taosMemoryCalloc(2, sizeof(SSubplan)); - - insertPlan[0].id.queryId = qId; - insertPlan[0].id.groupId = 0x0000000000000003; - insertPlan[0].id.subplanId = 0x0000000000000004; - insertPlan[0].subplanType = SUBPLAN_TYPE_MODIFY; - insertPlan[0].level = 0; - - insertPlan[0].execNode.nodeId = 1; - insertPlan[0].execNode.epSet.inUse = 0; - addEpIntoEpSet(&insertPlan[0].execNode.epSet, "ep0", 6030); - - insertPlan[0].pChildren = NULL; - insertPlan[0].pParents = NULL; - insertPlan[0].pNode = NULL; - insertPlan[0].pDataSink = (SDataSinkNode *)taosMemoryCalloc(1, sizeof(SDataSinkNode)); - insertPlan[0].msgType = TDMT_VND_SUBMIT; - - insertPlan[1].id.queryId = qId; - insertPlan[1].id.groupId = 0x0000000000000003; - insertPlan[1].id.subplanId = 0x0000000000000005; - insertPlan[1].subplanType = SUBPLAN_TYPE_MODIFY; - insertPlan[1].level = 0; - - insertPlan[1].execNode.nodeId = 1; - insertPlan[1].execNode.epSet.inUse = 0; - addEpIntoEpSet(&insertPlan[1].execNode.epSet, "ep0", 6030); - - insertPlan[1].pChildren = NULL; - insertPlan[1].pParents = NULL; - insertPlan[1].pNode = NULL; - insertPlan[1].pDataSink = (SDataSinkNode *)taosMemoryCalloc(1, sizeof(SDataSinkNode)); - insertPlan[1].msgType = TDMT_VND_SUBMIT; - inserta->pNodeList = nodesMakeList(); + SSubplan *insertPlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); + + insertPlan->id.queryId = qId; + insertPlan->id.groupId = 0x0000000000000003; + insertPlan->id.subplanId = 0x0000000000000004; + insertPlan->subplanType = SUBPLAN_TYPE_MODIFY; + insertPlan->level = 0; + + insertPlan->execNode.nodeId = 1; + insertPlan->execNode.epSet.inUse = 0; + addEpIntoEpSet(&insertPlan->execNode.epSet, "ep0", 6030); + + insertPlan->pChildren = NULL; + insertPlan->pParents = NULL; + insertPlan->pNode = NULL; + insertPlan->pDataSink = (SDataSinkNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT); + ((SDataInserterNode*)insertPlan->pDataSink)->size = 1; + ((SDataInserterNode*)insertPlan->pDataSink)->pData = taosMemoryCalloc(1, 1); + insertPlan->msgType = TDMT_VND_SUBMIT; + nodesListAppend(inserta->pNodeList, (SNode *)insertPlan); - insertPlan += 1; + + insertPlan = (SSubplan*)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); + + insertPlan->id.queryId = qId; + insertPlan->id.groupId = 0x0000000000000003; + insertPlan->id.subplanId = 0x0000000000000005; + insertPlan->subplanType = SUBPLAN_TYPE_MODIFY; + insertPlan->level = 0; + + insertPlan->execNode.nodeId = 1; + insertPlan->execNode.epSet.inUse = 0; + addEpIntoEpSet(&insertPlan->execNode.epSet, "ep0", 6030); + + insertPlan->pChildren = NULL; + insertPlan->pParents = NULL; + insertPlan->pNode = NULL; + insertPlan->pDataSink = (SDataSinkNode*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT); + ((SDataInserterNode*)insertPlan->pDataSink)->size = 1; + ((SDataInserterNode*)insertPlan->pDataSink)->pData = taosMemoryCalloc(1, 1); + insertPlan->msgType = TDMT_VND_SUBMIT; + nodesListAppend(inserta->pNodeList, (SNode *)insertPlan); nodesListAppend(dag->pSubplans, (SNode *)inserta); @@ -325,7 +388,7 @@ void schtSetRpcSendRequest() { } } -int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet *epSet, int64_t *pTransporterId, SMsgSendInfo *pInfo) { +int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet *epSet, int64_t *pTransporterId, SMsgSendInfo *pInfo, bool persistHandle, void* rpcCtx) { if (pInfo) { taosMemoryFreeClear(pInfo->param); taosMemoryFreeClear(pInfo->msgInfo.pData); @@ -336,17 +399,17 @@ int32_t schtAsyncSendMsgToServer(void *pTransporter, SEpSet *epSet, int64_t *pTr void schtSetAsyncSendMsgToServer() { static Stub stub; - stub.set(asyncSendMsgToServer, schtAsyncSendMsgToServer); + stub.set(asyncSendMsgToServerExt, schtAsyncSendMsgToServer); { #ifdef WINDOWS AddrAny any; std::map result; - any.get_func_addr("asyncSendMsgToServer", result); + any.get_func_addr("asyncSendMsgToServerExt", result); #endif #ifdef LINUX AddrAny any("libtransport.so"); std::map result; - any.get_global_func_addr_dynsym("^asyncSendMsgToServer$", result); + any.get_global_func_addr_dynsym("^asyncSendMsgToServerExt$", result); #endif for (const auto &f : result) { stub.set(f.second, schtAsyncSendMsgToServer); @@ -374,9 +437,13 @@ void *schtSendRsp(void *param) { while (pIter) { SSchTask *task = *(SSchTask **)pIter; - SSubmitRsp rsp = {0}; - rsp.affectedRows = 10; - schHandleResponseMsg(pJob, task, TDMT_VND_SUBMIT_RSP, (char *)&rsp, sizeof(rsp), 0); + SDataBuf msg = {0}; + void* rmsg = NULL; + schtBuildSubmitRspMsg(&msg.len, &rmsg); + msg.msgType = TDMT_VND_SUBMIT_RSP; + msg.pData = rmsg; + + schHandleResponseMsg(pJob, task, task->execId, &msg, 0); pIter = taosHashIterate(pJob->execTasks, pIter); } @@ -393,11 +460,13 @@ void *schtCreateFetchRspThread(void *param) { taosSsleep(1); int32_t code = 0; - SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp)); - rsp->completed = 1; - rsp->numOfRows = 10; - - code = schHandleResponseMsg(pJob, pJob->fetchTask, TDMT_SCH_FETCH_RSP, (char *)rsp, sizeof(*rsp), 0); + SDataBuf msg = {0}; + void* rmsg = NULL; + schtBuildFetchRspMsg(&msg.len, &rmsg); + msg.msgType = TDMT_SCH_MERGE_FETCH_RSP; + msg.pData = rmsg; + + code = schHandleResponseMsg(pJob, pJob->fetchTask, pJob->fetchTask->execId, &msg, 0); schReleaseJob(job); @@ -414,7 +483,7 @@ void *schtFetchRspThread(void *aa) { continue; } - taosUsleep(1); + taosUsleep(100); param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param)); @@ -426,10 +495,11 @@ void *schtFetchRspThread(void *aa) { rsp->completed = 1; rsp->numOfRows = 10; + dataBuf.msgType = TDMT_SCH_FETCH_RSP; dataBuf.pData = rsp; dataBuf.len = sizeof(*rsp); - code = schHandleCallback(param, &dataBuf, TDMT_SCH_FETCH_RSP, 0); + code = schHandleCallback(param, &dataBuf, 0); assert(code == 0 || code); } @@ -456,7 +526,7 @@ void *schtRunJobThread(void *aa) { char *dbname = "1.db1"; char *tablename = "table1"; SVgroupInfo vgInfo = {0}; - SQueryPlan dag; + SQueryPlan* dag = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN); schtInitLogFile(); @@ -470,19 +540,19 @@ void *schtRunJobThread(void *aa) { SSchJob *pJob = NULL; SSchTaskCallbackParam *param = NULL; SHashObj *execTasks = NULL; - SDataBuf dataBuf = {0}; uint32_t jobFinished = 0; int32_t queryDone = 0; while (!schtTestStop) { - schtBuildQueryDag(&dag); + schtBuildQueryDag(dag); - SArray *qnodeList = taosArrayInit(1, sizeof(SEp)); + SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad)); - SEp qnodeAddr = {0}; - strcpy(qnodeAddr.fqdn, "qnode0.ep"); - qnodeAddr.port = 6031; - taosArrayPush(qnodeList, &qnodeAddr); + SQueryNodeLoad load = {0}; + load.addr.epSet.numOfEps = 1; + strcpy(load.addr.epSet.eps[0].fqdn, "qnode0.ep"); + load.addr.epSet.eps[0].port = 6031; + taosArrayPush(qnodeList, &load); queryDone = 0; @@ -492,7 +562,7 @@ void *schtRunJobThread(void *aa) { req.syncReq = false; req.pConn = &conn; req.pNodeList = qnodeList; - req.pDag = &dag; + req.pDag = dag; req.sql = "select * from tb"; req.execFp = schtQueryCb; req.cbParam = &queryDone; @@ -503,7 +573,7 @@ void *schtRunJobThread(void *aa) { pJob = schAcquireJob(queryJobRefId); if (NULL == pJob) { taosArrayDestroy(qnodeList); - schtFreeQueryDag(&dag); + schtFreeQueryDag(dag); continue; } @@ -526,11 +596,14 @@ void *schtRunJobThread(void *aa) { SSchTask *task = (SSchTask *)pIter; param->taskId = task->taskId; - SQueryTableRsp rsp = {0}; - dataBuf.pData = &rsp; - dataBuf.len = sizeof(rsp); - code = schHandleCallback(param, &dataBuf, TDMT_SCH_QUERY_RSP, 0); + SDataBuf msg = {0}; + void* rmsg = NULL; + schtBuildQueryRspMsg(&msg.len, &rmsg); + msg.msgType = TDMT_SCH_QUERY_RSP; + msg.pData = rmsg; + + code = schHandleCallback(param, &msg, 0); assert(code == 0 || code); pIter = taosHashIterate(execTasks, pIter); @@ -545,11 +618,13 @@ void *schtRunJobThread(void *aa) { SSchTask *task = (SSchTask *)pIter; param->taskId = task->taskId - 1; - SQueryTableRsp rsp = {0}; - dataBuf.pData = &rsp; - dataBuf.len = sizeof(rsp); + SDataBuf msg = {0}; + void* rmsg = NULL; + schtBuildQueryRspMsg(&msg.len, &rmsg); + msg.msgType = TDMT_SCH_QUERY_RSP; + msg.pData = rmsg; - code = schHandleCallback(param, &dataBuf, TDMT_SCH_QUERY_RSP, 0); + code = schHandleCallback(param, &msg, 0); assert(code == 0 || code); pIter = taosHashIterate(execTasks, pIter); @@ -575,7 +650,6 @@ void *schtRunJobThread(void *aa) { if (0 == code) { SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data; assert(pRsp->completed == 1); - assert(pRsp->numOfRows == 10); } data = NULL; @@ -587,7 +661,7 @@ void *schtRunJobThread(void *aa) { taosHashCleanup(execTasks); taosArrayDestroy(qnodeList); - schtFreeQueryDag(&dag); + schtFreeQueryDag(dag); if (++jobFinished % schtTestPrintNum == 0) { printf("jobFinished:%d\n", jobFinished); @@ -609,6 +683,7 @@ void *schtFreeJobThread(void *aa) { return NULL; } + } // namespace TEST(queryTest, normalCase) { @@ -618,21 +693,20 @@ TEST(queryTest, normalCase) { char *tablename = "table1"; SVgroupInfo vgInfo = {0}; int64_t job = 0; - SQueryPlan dag; + SQueryPlan* dag = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN); - memset(&dag, 0, sizeof(dag)); + SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad)); - SArray *qnodeList = taosArrayInit(1, sizeof(SEp)); - - SEp qnodeAddr = {0}; - strcpy(qnodeAddr.fqdn, "qnode0.ep"); - qnodeAddr.port = 6031; - taosArrayPush(qnodeList, &qnodeAddr); + SQueryNodeLoad load = {0}; + load.addr.epSet.numOfEps = 1; + strcpy(load.addr.epSet.eps[0].fqdn, "qnode0.ep"); + load.addr.epSet.eps[0].port = 6031; + taosArrayPush(qnodeList, &load); int32_t code = schedulerInit(); ASSERT_EQ(code, 0); - schtBuildQueryDag(&dag); + schtBuildQueryDag(dag); schtSetPlanToString(); schtSetExecNode(); @@ -645,7 +719,7 @@ TEST(queryTest, normalCase) { SSchedulerReq req = {0}; req.pConn = &conn; req.pNodeList = qnodeList; - req.pDag = &dag; + req.pDag = dag; req.sql = "select * from tb"; req.execFp = schtQueryCb; req.cbParam = &queryDone; @@ -659,9 +733,14 @@ TEST(queryTest, normalCase) { while (pIter) { SSchTask *task = *(SSchTask **)pIter; - SQueryTableRsp rsp = {0}; - code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); - + SDataBuf msg = {0}; + void* rmsg = NULL; + schtBuildQueryRspMsg(&msg.len, &rmsg); + msg.msgType = TDMT_SCH_QUERY_RSP; + msg.pData = rmsg; + + code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0); + ASSERT_EQ(code, 0); pIter = taosHashIterate(pJob->execTasks, pIter); } @@ -669,11 +748,18 @@ TEST(queryTest, normalCase) { pIter = taosHashIterate(pJob->execTasks, NULL); while (pIter) { SSchTask *task = *(SSchTask **)pIter; + if (JOB_TASK_STATUS_EXEC == task->status) { + SDataBuf msg = {0}; + void* rmsg = NULL; + schtBuildQueryRspMsg(&msg.len, &rmsg); + msg.msgType = TDMT_SCH_QUERY_RSP; + msg.pData = rmsg; + + code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0); + + ASSERT_EQ(code, 0); + } - SQueryTableRsp rsp = {0}; - code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); - - ASSERT_EQ(code, 0); pIter = taosHashIterate(pJob->execTasks, pIter); } @@ -703,18 +789,12 @@ TEST(queryTest, normalCase) { ASSERT_EQ(pRsp->numOfRows, 10); taosMemoryFreeClear(data); - data = NULL; - code = schedulerFetchRows(job, &req); - ASSERT_EQ(code, 0); - ASSERT_TRUE(data == NULL); - schReleaseJob(job); + + schedulerDestroy(); schedulerFreeJob(&job, 0); - schtFreeQueryDag(&dag); - - schedulerDestroy(); } TEST(queryTest, readyFirstCase) { @@ -724,21 +804,20 @@ TEST(queryTest, readyFirstCase) { char *tablename = "table1"; SVgroupInfo vgInfo = {0}; int64_t job = 0; - SQueryPlan dag; + SQueryPlan* dag = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN); - memset(&dag, 0, sizeof(dag)); + SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad)); - SArray *qnodeList = taosArrayInit(1, sizeof(SEp)); - - SEp qnodeAddr = {0}; - strcpy(qnodeAddr.fqdn, "qnode0.ep"); - qnodeAddr.port = 6031; - taosArrayPush(qnodeList, &qnodeAddr); + SQueryNodeLoad load = {0}; + load.addr.epSet.numOfEps = 1; + strcpy(load.addr.epSet.eps[0].fqdn, "qnode0.ep"); + load.addr.epSet.eps[0].port = 6031; + taosArrayPush(qnodeList, &load); int32_t code = schedulerInit(); ASSERT_EQ(code, 0); - schtBuildQueryDag(&dag); + schtBuildQueryDag(dag); schtSetPlanToString(); schtSetExecNode(); @@ -751,7 +830,7 @@ TEST(queryTest, readyFirstCase) { SSchedulerReq req = {0}; req.pConn = &conn; req.pNodeList = qnodeList; - req.pDag = &dag; + req.pDag = dag; req.sql = "select * from tb"; req.execFp = schtQueryCb; req.cbParam = &queryDone; @@ -764,8 +843,13 @@ TEST(queryTest, readyFirstCase) { while (pIter) { SSchTask *task = *(SSchTask **)pIter; - SQueryTableRsp rsp = {0}; - code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); + SDataBuf msg = {0}; + void* rmsg = NULL; + schtBuildQueryRspMsg(&msg.len, &rmsg); + msg.msgType = TDMT_SCH_QUERY_RSP; + msg.pData = rmsg; + + code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0); ASSERT_EQ(code, 0); pIter = taosHashIterate(pJob->execTasks, pIter); @@ -775,10 +859,18 @@ TEST(queryTest, readyFirstCase) { while (pIter) { SSchTask *task = *(SSchTask **)pIter; - SQueryTableRsp rsp = {0}; - code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); + if (JOB_TASK_STATUS_EXEC == task->status) { + SDataBuf msg = {0}; + void* rmsg = NULL; + schtBuildQueryRspMsg(&msg.len, &rmsg); + msg.msgType = TDMT_SCH_QUERY_RSP; + msg.pData = rmsg; + + code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0); + + ASSERT_EQ(code, 0); + } - ASSERT_EQ(code, 0); pIter = taosHashIterate(pJob->execTasks, pIter); } @@ -807,18 +899,11 @@ TEST(queryTest, readyFirstCase) { ASSERT_EQ(pRsp->numOfRows, 10); taosMemoryFreeClear(data); - data = NULL; - code = schedulerFetchRows(job, &req); - ASSERT_EQ(code, 0); - ASSERT_TRUE(data == NULL); - schReleaseJob(job); - schedulerFreeJob(&job, 0); - - schtFreeQueryDag(&dag); - schedulerDestroy(); + + schedulerFreeJob(&job, 0); } TEST(queryTest, flowCtrlCase) { @@ -828,35 +913,39 @@ TEST(queryTest, flowCtrlCase) { char *tablename = "table1"; SVgroupInfo vgInfo = {0}; int64_t job = 0; - SQueryPlan dag; + SQueryPlan* dag = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN); schtInitLogFile(); taosSeedRand(taosGetTimestampSec()); - SArray *qnodeList = taosArrayInit(1, sizeof(SEp)); + SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad)); + + SQueryNodeLoad load = {0}; + load.addr.epSet.numOfEps = 1; + strcpy(load.addr.epSet.eps[0].fqdn, "qnode0.ep"); + load.addr.epSet.eps[0].port = 6031; + taosArrayPush(qnodeList, &load); - SEp qnodeAddr = {0}; - strcpy(qnodeAddr.fqdn, "qnode0.ep"); - qnodeAddr.port = 6031; - taosArrayPush(qnodeList, &qnodeAddr); int32_t code = schedulerInit(); ASSERT_EQ(code, 0); - schtBuildQueryFlowCtrlDag(&dag); + schtBuildQueryFlowCtrlDag(dag); schtSetPlanToString(); schtSetExecNode(); schtSetAsyncSendMsgToServer(); + initTaskQueue(); + int32_t queryDone = 0; SRequestConnInfo conn = {0}; conn.pTrans = mockPointer; SSchedulerReq req = {0}; req.pConn = &conn; req.pNodeList = qnodeList; - req.pDag = &dag; + req.pDag = dag; req.sql = "select * from tb"; req.execFp = schtQueryCb; req.cbParam = &queryDone; @@ -866,41 +955,27 @@ TEST(queryTest, flowCtrlCase) { SSchJob *pJob = schAcquireJob(job); - bool qDone = false; - - while (!qDone) { + while (!queryDone) { void *pIter = taosHashIterate(pJob->execTasks, NULL); - if (NULL == pIter) { - break; - } - while (pIter) { SSchTask *task = *(SSchTask **)pIter; - taosHashCancelIterate(pJob->execTasks, pIter); - - if (task->lastMsgType == TDMT_SCH_QUERY) { - SQueryTableRsp rsp = {0}; - code = schHandleResponseMsg(pJob, task, TDMT_SCH_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); - + if (JOB_TASK_STATUS_EXEC == task->status && 0 != task->lastMsgType) { + SDataBuf msg = {0}; + void* rmsg = NULL; + schtBuildQueryRspMsg(&msg.len, &rmsg); + msg.msgType = TDMT_SCH_QUERY_RSP; + msg.pData = rmsg; + + code = schHandleResponseMsg(pJob, task, task->execId, &msg, 0); + ASSERT_EQ(code, 0); - } else { - qDone = true; - break; } - pIter = NULL; + pIter = taosHashIterate(pJob->execTasks, pIter); } } - while (true) { - if (queryDone) { - break; - } - - taosUsleep(10000); - } - TdThreadAttr thattr; taosThreadAttrInit(&thattr); @@ -918,18 +993,11 @@ TEST(queryTest, flowCtrlCase) { ASSERT_EQ(pRsp->numOfRows, 10); taosMemoryFreeClear(data); - data = NULL; - code = schedulerFetchRows(job, &req); - ASSERT_EQ(code, 0); - ASSERT_TRUE(data == NULL); - schReleaseJob(job); - schedulerFreeJob(&job, 0); - - schtFreeQueryDag(&dag); - schedulerDestroy(); + + schedulerFreeJob(&job, 0); } TEST(insertTest, normalCase) { @@ -938,20 +1006,21 @@ TEST(insertTest, normalCase) { char *dbname = "1.db1"; char *tablename = "table1"; SVgroupInfo vgInfo = {0}; - SQueryPlan dag; + SQueryPlan* dag = (SQueryPlan*)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN); uint64_t numOfRows = 0; - SArray *qnodeList = taosArrayInit(1, sizeof(SEp)); + SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad)); - SEp qnodeAddr = {0}; - strcpy(qnodeAddr.fqdn, "qnode0.ep"); - qnodeAddr.port = 6031; - taosArrayPush(qnodeList, &qnodeAddr); + SQueryNodeLoad load = {0}; + load.addr.epSet.numOfEps = 1; + strcpy(load.addr.epSet.eps[0].fqdn, "qnode0.ep"); + load.addr.epSet.eps[0].port = 6031; + taosArrayPush(qnodeList, &load); int32_t code = schedulerInit(); ASSERT_EQ(code, 0); - schtBuildInsertDag(&dag); + schtBuildInsertDag(dag); schtSetPlanToString(); schtSetAsyncSendMsgToServer(); @@ -962,21 +1031,19 @@ TEST(insertTest, normalCase) { TdThread thread1; taosThreadCreate(&(thread1), &thattr, schtSendRsp, &insertJobRefId); - SExecResult res = {0}; - + int32_t queryDone = 0; SRequestConnInfo conn = {0}; conn.pTrans = mockPointer; SSchedulerReq req = {0}; req.pConn = &conn; req.pNodeList = qnodeList; - req.pDag = &dag; + req.pDag = dag; req.sql = "insert into tb values(now,1)"; req.execFp = schtQueryCb; - req.cbParam = NULL; + req.cbParam = &queryDone; code = schedulerExecJob(&req, &insertJobRefId); ASSERT_EQ(code, 0); - ASSERT_EQ(res.numOfRows, 20); schedulerFreeJob(&insertJobRefId, 0); @@ -989,7 +1056,7 @@ TEST(multiThread, forceFree) { TdThread thread1, thread2, thread3; taosThreadCreate(&(thread1), &thattr, schtRunJobThread, NULL); - taosThreadCreate(&(thread2), &thattr, schtFreeJobThread, NULL); +// taosThreadCreate(&(thread2), &thattr, schtFreeJobThread, NULL); taosThreadCreate(&(thread3), &thattr, schtFetchRspThread, NULL); while (true) { @@ -1002,7 +1069,7 @@ TEST(multiThread, forceFree) { } schtTestStop = true; - taosSsleep(3); + //taosSsleep(3); } int main(int argc, char **argv) {