From a62c0fda83aca172d84673bb57966c3867ad7030 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 17 Jul 2024 09:12:42 +0800 Subject: [PATCH] fix: scheduler UT issues --- source/libs/scheduler/test/schedulerTests.cpp | 190 +++++++++++++++--- 1 file changed, 159 insertions(+), 31 deletions(-) diff --git a/source/libs/scheduler/test/schedulerTests.cpp b/source/libs/scheduler/test/schedulerTests.cpp index 7bae175ba9..f906e3ec3e 100644 --- a/source/libs/scheduler/test/schedulerTests.cpp +++ b/source/libs/scheduler/test/schedulerTests.cpp @@ -75,9 +75,7 @@ int32_t schtStartFetch = 0; void schtInitLogFile() { const char *defaultLogFileNamePrefix = "taoslog"; const int32_t maxLogFileNum = 10; - rpcInit(); tsAsyncLog = 0; - rpcInit(); qDebugFlag = 159; strcpy(tsLogDir, TD_LOG_DIR_PATH); @@ -136,8 +134,13 @@ int32_t schtBuildSubmitRspMsg(uint32_t *msize, void **rspMsg) { tEncodeSize(tEncodeSSubmitRsp2, &submitRsp, msgSize, ret); void *msg = taosMemoryCalloc(1, msgSize); + if (NULL == msg) { + return terrno; + } tEncoderInit(&ec, (uint8_t *)msg, msgSize); - tEncodeSSubmitRsp2(&ec, &submitRsp); + if (tEncodeSSubmitRsp2(&ec, &submitRsp) < 0) { + return -1; + } tEncoderClear(&ec); *rspMsg = msg; @@ -152,11 +155,26 @@ void schtBuildQueryDag(SQueryPlan *dag) { dag->queryId = qId; dag->numOfSubplans = 2; dag->pSubplans = nodesMakeList(); + if (NULL == dag->pSubplans) { + return; + } SNodeListNode *scan = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST); + if (NULL == scan) { + return; + } SNodeListNode *merge = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST); + if (NULL == merge) { + return; + } SSubplan *scanPlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); + if (NULL == scanPlan) { + return; + } SSubplan *mergePlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); + if (NULL == mergePlan) { + return; + } scanPlan->id.queryId = qId; scanPlan->id.groupId = 0x0000000000000002; @@ -170,7 +188,13 @@ void schtBuildQueryDag(SQueryPlan *dag) { scanPlan->pChildren = NULL; scanPlan->level = 1; scanPlan->pParents = nodesMakeList(); + if (NULL == scanPlan->pParents) { + return; + } scanPlan->pNode = (SPhysiNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN); + if (NULL == scanPlan->pNode) { + return; + } scanPlan->msgType = TDMT_SCH_QUERY; mergePlan->id.queryId = qId; @@ -181,21 +205,33 @@ void schtBuildQueryDag(SQueryPlan *dag) { mergePlan->execNode.epSet.numOfEps = 0; mergePlan->pChildren = nodesMakeList(); + if (NULL == mergePlan->pChildren) { + return; + } mergePlan->pParents = NULL; mergePlan->pNode = (SPhysiNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_MERGE); + if (NULL == mergePlan->pNode) { + return; + } mergePlan->msgType = TDMT_SCH_QUERY; merge->pNodeList = nodesMakeList(); + if (NULL == merge->pNodeList) { + return; + } scan->pNodeList = nodesMakeList(); + if (NULL == scan->pNodeList) { + return; + } - nodesListAppend(merge->pNodeList, (SNode *)mergePlan); - nodesListAppend(scan->pNodeList, (SNode *)scanPlan); + (void)nodesListAppend(merge->pNodeList, (SNode *)mergePlan); + (void)nodesListAppend(scan->pNodeList, (SNode *)scanPlan); - nodesListAppend(mergePlan->pChildren, (SNode *)scanPlan); - nodesListAppend(scanPlan->pParents, (SNode *)mergePlan); + (void)nodesListAppend(mergePlan->pChildren, (SNode *)scanPlan); + (void)nodesListAppend(scanPlan->pParents, (SNode *)mergePlan); - nodesListAppend(dag->pSubplans, (SNode *)merge); - nodesListAppend(dag->pSubplans, (SNode *)scan); + (void)nodesListAppend(dag->pSubplans, (SNode *)merge); + (void)nodesListAppend(dag->pSubplans, (SNode *)scan); } void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) { @@ -205,18 +241,42 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) { dag->queryId = qId; dag->numOfSubplans = 2; dag->pSubplans = nodesMakeList(); + if (NULL == dag->pSubplans) { + return; + } SNodeListNode *scan = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST); + if (NULL == scan) { + return; + } SNodeListNode *merge = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST); + if (NULL == merge) { + return; + } SSubplan *mergePlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); + if (NULL == mergePlan) { + return; + } merge->pNodeList = nodesMakeList(); + if (NULL == merge->pNodeList) { + return; + } scan->pNodeList = nodesMakeList(); + if (NULL == scan->pNodeList) { + return; + } mergePlan->pChildren = nodesMakeList(); + if (NULL == mergePlan->pChildren) { + return; + } for (int32_t i = 0; i < scanPlanNum; ++i) { SSubplan *scanPlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); + if (NULL == scanPlan) { + return; + } scanPlan->id.queryId = qId; scanPlan->id.groupId = 0x0000000000000002; scanPlan->id.subplanId = 0x0000000000000003 + i; @@ -233,13 +293,19 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) { scanPlan->pChildren = NULL; scanPlan->level = 1; scanPlan->pParents = nodesMakeList(); + if (NULL == scanPlan->pParents) { + return; + } scanPlan->pNode = (SPhysiNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN); + if (NULL == scanPlan->pNode) { + return; + } scanPlan->msgType = TDMT_SCH_QUERY; - nodesListAppend(scanPlan->pParents, (SNode *)mergePlan); - nodesListAppend(mergePlan->pChildren, (SNode *)scanPlan); + (void)nodesListAppend(scanPlan->pParents, (SNode *)mergePlan); + (void)nodesListAppend(mergePlan->pChildren, (SNode *)scanPlan); - nodesListAppend(scan->pNodeList, (SNode *)scanPlan); + (void)nodesListAppend(scan->pNodeList, (SNode *)scanPlan); } mergePlan->id.queryId = qId; @@ -251,12 +317,15 @@ void schtBuildQueryFlowCtrlDag(SQueryPlan *dag) { mergePlan->pParents = NULL; mergePlan->pNode = (SPhysiNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_MERGE); + if (NULL == mergePlan->pNode) { + return; + } mergePlan->msgType = TDMT_SCH_QUERY; - nodesListAppend(merge->pNodeList, (SNode *)mergePlan); + (void)nodesListAppend(merge->pNodeList, (SNode *)mergePlan); - nodesListAppend(dag->pSubplans, (SNode *)merge); - nodesListAppend(dag->pSubplans, (SNode *)scan); + (void)nodesListAppend(dag->pSubplans, (SNode *)merge); + (void)nodesListAppend(dag->pSubplans, (SNode *)scan); } void schtFreeQueryDag(SQueryPlan *dag) {} @@ -267,10 +336,22 @@ void schtBuildInsertDag(SQueryPlan *dag) { dag->queryId = qId; dag->numOfSubplans = 2; dag->pSubplans = nodesMakeList(); + if (NULL == dag->pSubplans) { + return; + } SNodeListNode *inserta = (SNodeListNode *)nodesMakeNode(QUERY_NODE_NODE_LIST); + if (NULL == inserta) { + return; + } inserta->pNodeList = nodesMakeList(); + if (NULL == inserta->pNodeList) { + return; + } SSubplan *insertPlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); + if (NULL == insertPlan) { + return; + } insertPlan->id.queryId = qId; insertPlan->id.groupId = 0x0000000000000003; @@ -286,13 +367,22 @@ void schtBuildInsertDag(SQueryPlan *dag) { insertPlan->pParents = NULL; insertPlan->pNode = NULL; insertPlan->pDataSink = (SDataSinkNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT); + if (NULL == insertPlan->pDataSink) { + return; + } ((SDataInserterNode *)insertPlan->pDataSink)->size = 1; ((SDataInserterNode *)insertPlan->pDataSink)->pData = taosMemoryCalloc(1, 1); + if (NULL == ((SDataInserterNode *)insertPlan->pDataSink)->pData) { + return; + } insertPlan->msgType = TDMT_VND_SUBMIT; - nodesListAppend(inserta->pNodeList, (SNode *)insertPlan); + (void)nodesListAppend(inserta->pNodeList, (SNode *)insertPlan); insertPlan = (SSubplan *)nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); + if (NULL == insertPlan) { + return; + } insertPlan->id.queryId = qId; insertPlan->id.groupId = 0x0000000000000003; @@ -308,22 +398,31 @@ void schtBuildInsertDag(SQueryPlan *dag) { insertPlan->pParents = NULL; insertPlan->pNode = NULL; insertPlan->pDataSink = (SDataSinkNode *)nodesMakeNode(QUERY_NODE_PHYSICAL_PLAN_INSERT); + if (NULL == insertPlan->pDataSink) { + return; + } ((SDataInserterNode *)insertPlan->pDataSink)->size = 1; ((SDataInserterNode *)insertPlan->pDataSink)->pData = taosMemoryCalloc(1, 1); + if (NULL == ((SDataInserterNode *)insertPlan->pDataSink)->pData) { + return; + } insertPlan->msgType = TDMT_VND_SUBMIT; - nodesListAppend(inserta->pNodeList, (SNode *)insertPlan); + (void)nodesListAppend(inserta->pNodeList, (SNode *)insertPlan); - nodesListAppend(dag->pSubplans, (SNode *)inserta); + (void)nodesListAppend(dag->pSubplans, (SNode *)inserta); } int32_t schtPlanToString(const SSubplan *subplan, char **str, int32_t *len) { *str = (char *)taosMemoryCalloc(1, 20); + if (NULL == *str) { + return -1; + } *len = 20; return 0; } -void schtExecNode(SSubplan *subplan, uint64_t groupId, SQueryNodeAddr *ep) {} +int32_t schtExecNode(SSubplan *subplan, uint64_t groupId, SQueryNodeAddr *ep) { return 0; } void schtRpcSendRequest(void *shandle, const SEpSet *pEpSet, SRpcMsg *pMsg, int64_t *pRid) {} @@ -442,16 +541,16 @@ void *schtSendRsp(void *param) { SDataBuf msg = {0}; void *rmsg = NULL; - schtBuildSubmitRspMsg(&msg.len, &rmsg); + (void)schtBuildSubmitRspMsg(&msg.len, &rmsg); msg.msgType = TDMT_VND_SUBMIT_RSP; msg.pData = rmsg; - schHandleResponseMsg(pJob, task, task->execId, &msg, 0); + (void)schHandleResponseMsg(pJob, task, task->execId, &msg, 0); pIter = taosHashIterate(pJob->execTasks, pIter); } - schReleaseJob(job); + (void)schReleaseJob(job); schtJobDone = true; @@ -472,13 +571,13 @@ void *schtCreateFetchRspThread(void *param) { int32_t code = 0; SDataBuf msg = {0}; void *rmsg = NULL; - schtBuildFetchRspMsg(&msg.len, &rmsg); + (void)schtBuildFetchRspMsg(&msg.len, &rmsg); msg.msgType = TDMT_SCH_MERGE_FETCH_RSP; msg.pData = rmsg; code = schHandleResponseMsg(pJob, pJob->fetchTask, pJob->fetchTask->execId, &msg, 0); - schReleaseJob(job); + (void)schReleaseJob(job); assert(code == 0); return NULL; @@ -496,12 +595,17 @@ void *schtFetchRspThread(void *aa) { taosUsleep(100); param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param)); - + if (NULL == param) { + return NULL; + } param->queryId = schtQueryId; param->taskId = schtFetchTaskId; int32_t code = 0; SRetrieveTableRsp *rsp = (SRetrieveTableRsp *)taosMemoryCalloc(1, sizeof(SRetrieveTableRsp)); + if (NULL == rsp) { + return NULL; + } rsp->completed = 1; rsp->numOfRows = 10; @@ -557,12 +661,17 @@ void *schtRunJobThread(void *aa) { schtBuildQueryDag(dag); SArray *qnodeList = taosArrayInit(1, sizeof(SQueryNodeLoad)); - + if (NULL == qnodeList) { + assert(0); + } + SQueryNodeLoad load = {0}; load.addr.epSet.numOfEps = 1; strcpy(load.addr.epSet.eps[0].fqdn, "qnode0.ep"); load.addr.epSet.eps[0].port = 6031; - taosArrayPush(qnodeList, &load); + if (NULL == taosArrayPush(qnodeList, &load)) { + assert(0); + } queryDone = 0; @@ -590,16 +699,24 @@ void *schtRunJobThread(void *aa) { } execTasks = taosHashInit(5, taosGetDefaultHashFunction(TSDB_DATA_TYPE_UBIGINT), false, HASH_ENTRY_LOCK); + if (NULL == execTasks) { + assert(0); + } void *pIter = taosHashIterate(pJob->execTasks, NULL); while (pIter) { SSchTask *task = *(SSchTask **)pIter; schtFetchTaskId = task->taskId - 1; - taosHashPut(execTasks, &task->taskId, sizeof(task->taskId), task, sizeof(*task)); + if (taosHashPut(execTasks, &task->taskId, sizeof(task->taskId), task, sizeof(*task))) { + assert(0); + } pIter = taosHashIterate(pJob->execTasks, pIter); } param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param)); + if (NULL == param) { + assert(0); + } param->refId = queryJobRefId; param->queryId = pJob->queryId; @@ -611,7 +728,9 @@ void *schtRunJobThread(void *aa) { SDataBuf msg = {0}; void *rmsg = NULL; - schtBuildQueryRspMsg(&msg.len, &rmsg); + if (schtBuildQueryRspMsg(&msg.len, &rmsg)) { + assert(0); + } msg.msgType = TDMT_SCH_QUERY_RSP; msg.pData = rmsg; @@ -622,6 +741,9 @@ void *schtRunJobThread(void *aa) { } param = (SSchTaskCallbackParam *)taosMemoryCalloc(1, sizeof(*param)); + if (NULL == param) { + assert(0); + } param->refId = queryJobRefId; param->queryId = pJob->queryId; @@ -632,7 +754,9 @@ void *schtRunJobThread(void *aa) { param->taskId = task->taskId - 1; SDataBuf msg = {0}; void *rmsg = NULL; - schtBuildQueryRspMsg(&msg.len, &rmsg); + if (schtBuildQueryRspMsg(&msg.len, &rmsg)) { + assert(0); + } msg.msgType = TDMT_SCH_QUERY_RSP; msg.pData = rmsg; @@ -1110,13 +1234,17 @@ TEST(otherTest, otherCase) { schReleaseJob(0); schFreeRpcCtx(NULL); - ASSERT_EQ(schDumpEpSet(NULL, NULL), TSDB_CODE_SUCCESS); + char* ep = NULL; + ASSERT_EQ(schDumpEpSet(NULL, &ep), TSDB_CODE_SUCCESS); ASSERT_EQ(strcmp(schGetOpStr(SCH_OP_NULL), "NULL"), 0); ASSERT_EQ(strcmp(schGetOpStr((SCH_OP_TYPE)100), "UNKNOWN"), 0); } int main(int argc, char **argv) { schtInitLogFile(); + if (rpcInit()) { + assert(0); + } taosSeedRand(taosGetTimestampSec()); testing::InitGoogleTest(&argc, argv); return RUN_ALL_TESTS();