diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 61cc8cd25d..c974ffebc4 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -634,6 +634,111 @@ TEST(queryTest, normalCase) { schedulerDestroy(); } +TEST(queryTest, flowCtrlCase) { + void *mockPointer = (void *)0x1; + char *clusterId = "cluster1"; + char *dbname = "1.db1"; + char *tablename = "table1"; + SVgroupInfo vgInfo = {0}; + int64_t job = 0; + SQueryPlan dag; + + schtInitLogFile(); + + SArray *qnodeList = taosArrayInit(1, sizeof(SEp)); + + SEp qnodeAddr = {0}; + strcpy(qnodeAddr.fqdn, "qnode0.ep"); + qnodeAddr.port = 6031; + taosArrayPush(qnodeList, &qnodeAddr); + + int32_t code = schedulerInit(NULL); + ASSERT_EQ(code, 0); + + schtBuildQueryDag(&dag); + + schtSetPlanToString(); + schtSetExecNode(); + schtSetAsyncSendMsgToServer(); + + code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job); + ASSERT_EQ(code, 0); + + + SSchJob *pJob = schAcquireJob(job); + + void *pIter = taosHashIterate(pJob->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + + SQueryTableRsp rsp = {0}; + code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); + + ASSERT_EQ(code, 0); + pIter = taosHashIterate(pJob->execTasks, pIter); + } + + pIter = taosHashIterate(pJob->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + + SResReadyRsp rsp = {0}; + code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); + printf("code:%d", code); + ASSERT_EQ(code, 0); + pIter = taosHashIterate(pJob->execTasks, pIter); + } + + pIter = taosHashIterate(pJob->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + + SQueryTableRsp rsp = {0}; + code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); + + ASSERT_EQ(code, 0); + pIter = taosHashIterate(pJob->execTasks, pIter); + } + + pIter = taosHashIterate(pJob->execTasks, NULL); + while (pIter) { + SSchTask *task = *(SSchTask **)pIter; + + SResReadyRsp rsp = {0}; + code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); + ASSERT_EQ(code, 0); + + pIter = taosHashIterate(pJob->execTasks, pIter); + } + + pthread_attr_t thattr; + pthread_attr_init(&thattr); + + pthread_t thread1; + pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, &job); + + void *data = NULL; + code = schedulerFetchRows(job, &data); + ASSERT_EQ(code, 0); + + SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data; + ASSERT_EQ(pRsp->completed, 1); + ASSERT_EQ(pRsp->numOfRows, 10); + tfree(data); + + data = NULL; + code = schedulerFetchRows(job, &data); + ASSERT_EQ(code, 0); + ASSERT_TRUE(data == NULL); + + schReleaseJob(job); + + schedulerFreeJob(job); + + schtFreeQueryDag(&dag); + + schedulerDestroy(); +} TEST(insertTest, normalCase) {