feature/scheduler

This commit is contained in:
dapan 2022-03-05 11:21:09 +08:00
parent b6b4563ac7
commit 6830d86670
1 changed files with 105 additions and 0 deletions

View File

@ -634,6 +634,111 @@ TEST(queryTest, normalCase) {
schedulerDestroy(); schedulerDestroy();
} }
TEST(queryTest, flowCtrlCase) {
void *mockPointer = (void *)0x1;
char *clusterId = "cluster1";
char *dbname = "1.db1";
char *tablename = "table1";
SVgroupInfo vgInfo = {0};
int64_t job = 0;
SQueryPlan dag;
schtInitLogFile();
SArray *qnodeList = taosArrayInit(1, sizeof(SEp));
SEp qnodeAddr = {0};
strcpy(qnodeAddr.fqdn, "qnode0.ep");
qnodeAddr.port = 6031;
taosArrayPush(qnodeList, &qnodeAddr);
int32_t code = schedulerInit(NULL);
ASSERT_EQ(code, 0);
schtBuildQueryDag(&dag);
schtSetPlanToString();
schtSetExecNode();
schtSetAsyncSendMsgToServer();
code = schedulerAsyncExecJob(mockPointer, qnodeList, &dag, "select * from tb", &job);
ASSERT_EQ(code, 0);
SSchJob *pJob = schAcquireJob(job);
void *pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(pJob->execTasks, pIter);
}
pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SResReadyRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
printf("code:%d", code);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(pJob->execTasks, pIter);
}
pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SQueryTableRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(pJob->execTasks, pIter);
}
pIter = taosHashIterate(pJob->execTasks, NULL);
while (pIter) {
SSchTask *task = *(SSchTask **)pIter;
SResReadyRsp rsp = {0};
code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0);
ASSERT_EQ(code, 0);
pIter = taosHashIterate(pJob->execTasks, pIter);
}
pthread_attr_t thattr;
pthread_attr_init(&thattr);
pthread_t thread1;
pthread_create(&(thread1), &thattr, schtCreateFetchRspThread, &job);
void *data = NULL;
code = schedulerFetchRows(job, &data);
ASSERT_EQ(code, 0);
SRetrieveTableRsp *pRsp = (SRetrieveTableRsp *)data;
ASSERT_EQ(pRsp->completed, 1);
ASSERT_EQ(pRsp->numOfRows, 10);
tfree(data);
data = NULL;
code = schedulerFetchRows(job, &data);
ASSERT_EQ(code, 0);
ASSERT_TRUE(data == NULL);
schReleaseJob(job);
schedulerFreeJob(job);
schtFreeQueryDag(&dag);
schedulerDestroy();
}
TEST(insertTest, normalCase) { TEST(insertTest, normalCase) {