From 9b4bda4f18273c1b3273fb0c9320512fd0231443 Mon Sep 17 00:00:00 2001 From: dapan Date: Sat, 5 Mar 2022 12:15:22 +0800 Subject: [PATCH] feature/scheduler --- source/libs/scheduler/src/scheduler.c | 6 +++--- source/libs/scheduler/test/schedulerTests.cpp | 21 ++++++++++++------- 2 files changed, 16 insertions(+), 11 deletions(-) diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index e6a5c285de..a5e419e00b 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -114,18 +114,18 @@ int32_t schValidateTaskReceivedMsgType(SSchJob *pJob, SSchTask *pTask, int32_t m case TDMT_VND_FETCH_RSP: case TDMT_VND_DROP_TASK: if (lastMsgType != (msgType - 1)) { - SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%d, rspType:%d", lastMsgType, msgType); + SCH_TASK_ELOG("rsp msg type mis-match, last sent msgType:%s, rspType:%s", TMSG_INFO(lastMsgType), TMSG_INFO(msgType)); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } if (SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_EXECUTING && SCH_GET_TASK_STATUS(pTask) != JOB_TASK_STATUS_PARTIAL_SUCCEED) { - SCH_TASK_ELOG("rsp msg conflicted with task status, status:%d, rspType:%d", SCH_GET_TASK_STATUS(pTask), msgType); + SCH_TASK_ELOG("rsp msg conflicted with task status, status:%d, rspType:%s", SCH_GET_TASK_STATUS(pTask), TMSG_INFO(msgType)); SCH_ERR_RET(TSDB_CODE_SCH_STATUS_ERROR); } break; default: - SCH_TASK_ELOG("unknown rsp msg, type:%d, status:%d", msgType, SCH_GET_TASK_STATUS(pTask)); + SCH_TASK_ELOG("unknown rsp msg, type:%s, status:%d", TMSG_INFO(msgType), SCH_GET_TASK_STATUS(pTask)); SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index c974ffebc4..7771fbdf0b 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -113,6 +113,9 @@ void schtBuildQueryDag(SQueryPlan *dag) { mergePlan->pNode = (SPhysiNode*)calloc(1, sizeof(SPhysiNode)); mergePlan->msgType = TDMT_VND_QUERY; + merge->pNodeList = nodesMakeList(); + scan->pNodeList = nodesMakeList(); + nodesListAppend(merge->pNodeList, (SNode*)mergePlan); nodesListAppend(scan->pNodeList, (SNode*)scanPlan); @@ -170,6 +173,8 @@ void schtBuildInsertDag(SQueryPlan *dag) { insertPlan[1].pDataSink = (SDataSinkNode*)calloc(1, sizeof(SDataSinkNode)); insertPlan[1].msgType = TDMT_VND_SUBMIT; + inserta->pNodeList = nodesMakeList(); + nodesListAppend(inserta->pNodeList, (SNode*)insertPlan); insertPlan += 1; nodesListAppend(inserta->pNodeList, (SNode*)insertPlan); @@ -537,8 +542,6 @@ TEST(queryTest, normalCase) { int64_t job = 0; SQueryPlan dag; - schtInitLogFile(); - SArray *qnodeList = taosArrayInit(1, sizeof(SEp)); SEp qnodeAddr = {0}; @@ -675,7 +678,8 @@ TEST(queryTest, flowCtrlCase) { code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); - pIter = taosHashIterate(pJob->execTasks, pIter); + taosHashCancelIterate(pJob->execTasks, pIter); + pIter = NULL; } pIter = taosHashIterate(pJob->execTasks, NULL); @@ -686,7 +690,8 @@ TEST(queryTest, flowCtrlCase) { code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); printf("code:%d", code); ASSERT_EQ(code, 0); - pIter = taosHashIterate(pJob->execTasks, pIter); + taosHashCancelIterate(pJob->execTasks, pIter); + pIter = NULL; } pIter = taosHashIterate(pJob->execTasks, NULL); @@ -697,7 +702,8 @@ TEST(queryTest, flowCtrlCase) { code = schHandleResponseMsg(pJob, task, TDMT_VND_QUERY_RSP, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); - pIter = taosHashIterate(pJob->execTasks, pIter); + taosHashCancelIterate(pJob->execTasks, pIter); + pIter = NULL; } pIter = taosHashIterate(pJob->execTasks, NULL); @@ -708,7 +714,8 @@ TEST(queryTest, flowCtrlCase) { code = schHandleResponseMsg(pJob, task, TDMT_VND_RES_READY_RSP, (char *)&rsp, sizeof(rsp), 0); ASSERT_EQ(code, 0); - pIter = taosHashIterate(pJob->execTasks, pIter); + taosHashCancelIterate(pJob->execTasks, pIter); + pIter = NULL; } pthread_attr_t thattr; @@ -750,8 +757,6 @@ TEST(insertTest, normalCase) { SQueryPlan dag; uint64_t numOfRows = 0; - schtInitLogFile(); - SArray *qnodeList = taosArrayInit(1, sizeof(SEp)); SEp qnodeAddr = {0};