From 50d7a458e524b00e3c2e70b6240aaf2d207b55ac Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Mon, 15 Jul 2024 15:40:39 +0800 Subject: [PATCH] fix: qworker ut cases --- source/libs/qworker/test/qworkerTests.cpp | 167 ++++++++++++++-------- 1 file changed, 105 insertions(+), 62 deletions(-) diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 4a0d74a6e3..b60e285cb4 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -130,7 +130,7 @@ void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) { fetchRpc->contLen = sizeof(SResFetchReq); } -void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) { +int qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) { dropMsg->sId = 1; dropMsg->queryId = atomic_load_64(&qwtTestQueryId); dropMsg->taskId = 1; @@ -164,6 +164,10 @@ int32_t qwtStringToPlan(const char *str, SSubplan **subplan) { int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) { taosWLockLatch(&qwtTestFetchQueueLock); struct SRpcMsg *newMsg = (struct SRpcMsg *)taosMemoryCalloc(1, sizeof(struct SRpcMsg)); + if (NULL == newMsg) { + printf("malloc failed"); + assert(0); + } memcpy(newMsg, pMsg, sizeof(struct SRpcMsg)); qwtTestFetchQueue[qwtTestFetchQueueWIdx++] = newMsg; if (qwtTestFetchQueueWIdx >= qwtTestFetchQueueSize) { @@ -178,7 +182,10 @@ int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) { } taosWUnLockLatch(&qwtTestFetchQueueLock); - tsem_post(&qwtTestFetchSem); + if (tsem_post(&qwtTestFetchSem) < 0) { + printf("tsem_post failed, errno:%d", errno); + assert(0); + } return 0; } @@ -186,6 +193,10 @@ int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) { int32_t qwtPutReqToQueue(void *node, EQueueType qtype, struct SRpcMsg *pMsg) { taosWLockLatch(&qwtTestQueryQueueLock); struct SRpcMsg *newMsg = (struct SRpcMsg *)taosMemoryCalloc(1, sizeof(struct SRpcMsg)); + if (NULL == newMsg) { + printf("malloc failed"); + assert(0); + } memcpy(newMsg, pMsg, sizeof(struct SRpcMsg)); qwtTestQueryQueue[qwtTestQueryQueueWIdx++] = newMsg; if (qwtTestQueryQueueWIdx >= qwtTestQueryQueueSize) { @@ -200,22 +211,34 @@ int32_t qwtPutReqToQueue(void *node, EQueueType qtype, struct SRpcMsg *pMsg) { } taosWUnLockLatch(&qwtTestQueryQueueLock); - tsem_post(&qwtTestQuerySem); + if (tsem_post(&qwtTestQuerySem) < 0) { + printf("tsem_post failed, errno:%d", errno); + assert(0); + } return 0; } void qwtSendReqToDnode(void *pVnode, struct SEpSet *epSet, struct SRpcMsg *pReq) {} -void qwtRpcSendResponse(const SRpcMsg *pRsp) { +int qwtRpcSendResponse(const SRpcMsg *pRsp) { + int32_t code = 0; switch (pRsp->msgType) { case TDMT_SCH_QUERY_RSP: case TDMT_SCH_MERGE_QUERY_RSP: { SQueryTableRsp *rsp = (SQueryTableRsp *)pRsp->pCont; if (pRsp->code) { - qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); - qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc); + code = qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); + if (code) { + assert(0); + return code; + } + code = qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc); + if (code) { + assert(0); + return code; + } } rpcFreeCont(rsp); @@ -227,13 +250,25 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) { if (0 == pRsp->code && 0 == rsp->completed) { qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc); - qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc); + code = qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc); + if (code) { + assert(0); + return code; + } rpcFreeCont(rsp); return; } - qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); - qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc); + code = qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); + if (code) { + assert(0); + return code; + } + code = qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc); + if (code) { + assert(0); + return code; + } rpcFreeCont(rsp); break; @@ -245,9 +280,11 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) { qwtTestCaseFinished = true; break; } + default: + break; } - return; + return code; } int32_t qwtCreateExecTask(void *tsdb, int32_t vgId, uint64_t taskId, struct SSubplan *pPlan, qTaskInfo_t *pTaskInfo, @@ -292,6 +329,9 @@ int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock **pRes, uint64_t *useconds) { if (endExec) { *pRes = (SSDataBlock *)taosMemoryCalloc(1, sizeof(SSDataBlock)); + if (NULL == *pRes) { + return terrno; + } (*pRes)->info.rows = taosRand() % 1000 + 1; } else { *pRes = NULL; @@ -631,7 +671,7 @@ void *queryThread(void *param) { while (!qwtTestStop) { qwtBuildQueryReqMsg(&queryRpc); - qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); + (void)qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); // ignore error if (qwtTestEnableSleep) { taosUsleep(taosRand() % 5); } @@ -653,7 +693,7 @@ void *fetchThread(void *param) { while (!qwtTestStop) { qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc); - code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); + (void)qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); // ignore error if (qwtTestEnableSleep) { taosUsleep(taosRand() % 5); } @@ -674,8 +714,11 @@ void *dropThread(void *param) { STaskDropReq dropMsg = {0}; while (!qwtTestStop) { - qwtBuildDropReqMsg(&dropMsg, &dropRpc); - code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); + if (0 != qwtBuildDropReqMsg(&dropMsg, &dropRpc)) { + break; + } + (void)qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); // ignore error + if (qwtTestEnableSleep) { taosUsleep(taosRand() % 5); } @@ -700,7 +743,7 @@ void *qwtclientThread(void *param) { qwtTestCaseFinished = false; qwtBuildQueryReqMsg(&queryRpc); - qwtPutReqToQueue((void *)0x1, QUERY_QUEUE, &queryRpc); + (void)qwtPutReqToQueue((void *)0x1, QUERY_QUEUE, &queryRpc); //ignore error while (!qwtTestCaseFinished) { taosUsleep(1); @@ -752,9 +795,9 @@ void *queryQueueThread(void *param) { } if (TDMT_SCH_QUERY == queryRpc->msgType) { - qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc, 0); + (void)qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc, 0); //ignore error } else if (TDMT_SCH_QUERY_CONTINUE == queryRpc->msgType) { - qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc, 0); + (void)qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc, 0); //ignore error } else { printf("unknown msg in query queue, type:%d\n", queryRpc->msgType); assert(0); @@ -810,16 +853,16 @@ void *fetchQueueThread(void *param) { switch (fetchRpc->msgType) { case TDMT_SCH_FETCH: case TDMT_SCH_MERGE_FETCH: - qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0); + (void)qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0); //ignore error break; case TDMT_SCH_CANCEL_TASK: //qWorkerProcessCancelMsg(mockPointer, mgmt, fetchRpc, 0); break; case TDMT_SCH_DROP_TASK: - qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc, 0); + (void)qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc, 0); //ignore error break; case TDMT_SCH_TASK_NOTIFY: - qWorkerProcessNotifyMsg(mockPointer, mgmt, fetchRpc, 0); + (void)qWorkerProcessNotifyMsg(mockPointer, mgmt, fetchRpc, 0); //ignore error break; default: printf("unknown msg type:%d in fetch queue", fetchRpc->msgType); @@ -853,7 +896,7 @@ TEST(seqTest, normalCase) { qwtBuildQueryReqMsg(&queryRpc); qwtBuildFetchReqMsg(&qwtfetchMsg, &fetchRpc); - qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); + (void)qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); //ignore error stubSetStringToPlan(); stubSetRpcSendResponse(); @@ -898,7 +941,7 @@ TEST(seqTest, cancelFirst) { qwtInitLogFile(); qwtBuildQueryReqMsg(&queryRpc); - qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); + (void)qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); //ignore error stubSetStringToPlan(); stubSetRpcSendResponse(); @@ -954,7 +997,7 @@ TEST(seqTest, randCase) { if (r >= 0 && r < maxr / 5) { printf("Query,%d\n", t++); qwtBuildQueryReqMsg(&queryRpc); - code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); + (void)qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); //ignore error } else if (r >= maxr / 5 && r < maxr * 2 / 5) { // printf("Ready,%d\n", t++); // qwtBuildReadyReqMsg(&readyMsg, &readyRpc); @@ -965,14 +1008,14 @@ TEST(seqTest, randCase) { } else if (r >= maxr * 2 / 5 && r < maxr * 3 / 5) { printf("Fetch,%d\n", t++); qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc); - code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); + (void)qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); //ignore error if (qwtTestEnableSleep) { taosUsleep(1); } } else if (r >= maxr * 3 / 5 && r < maxr * 4 / 5) { printf("Drop,%d\n", t++); - qwtBuildDropReqMsg(&dropMsg, &dropRpc); - code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); + (void)qwtBuildDropReqMsg(&dropMsg, &dropRpc); //ignore error + (void)qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); //ignore error if (qwtTestEnableSleep) { taosUsleep(1); } @@ -1018,14 +1061,14 @@ TEST(seqTest, multithreadRand) { ASSERT_EQ(code, 0); TdThreadAttr thattr; - taosThreadAttrInit(&thattr); + (void)taosThreadAttrInit(&thattr); //ignore error TdThread t1, t2, t3, t4, t5, t6; - taosThreadCreate(&(t1), &thattr, queryThread, mgmt); - // taosThreadCreate(&(t2), &thattr, readyThread, NULL); - taosThreadCreate(&(t3), &thattr, fetchThread, NULL); - taosThreadCreate(&(t4), &thattr, dropThread, NULL); - taosThreadCreate(&(t6), &thattr, fetchQueueThread, mgmt); + (void)taosThreadCreate(&(t1), &thattr, queryThread, mgmt); //ignore error + // (void)taosThreadCreate(&(t2), &thattr, readyThread, NULL); //ignore error + (void)taosThreadCreate(&(t3), &thattr, fetchThread, NULL); //ignore error + (void)taosThreadCreate(&(t4), &thattr, dropThread, NULL); //ignore error + (void)taosThreadCreate(&(t6), &thattr, fetchQueueThread, mgmt); //ignore error while (true) { if (qwtTestDeadLoop) { @@ -1083,16 +1126,16 @@ TEST(rcTest, shortExecshortDelay) { qwtTestMaxExecTaskUsec = 0; qwtTestReqMaxDelayUsec = 0; - tsem_init(&qwtTestQuerySem, 0, 0); - tsem_init(&qwtTestFetchSem, 0, 0); + (void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error + (void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error TdThreadAttr thattr; - taosThreadAttrInit(&thattr); + (void)taosThreadAttrInit(&thattr); //ignore error TdThread t1, t2, t3, t4, t5; - taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); - taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); - taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); + (void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error + (void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error + (void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error while (true) { if (qwtTestDeadLoop) { @@ -1114,8 +1157,8 @@ TEST(rcTest, shortExecshortDelay) { if (qwtTestCaseFinished) { if (qwtTestQuitThreadNum < 3) { - tsem_post(&qwtTestQuerySem); - tsem_post(&qwtTestFetchSem); + (void)tsem_post(&qwtTestQuerySem); //ignore error + (void)tsem_post(&qwtTestFetchSem); //ignore error taosUsleep(10); } @@ -1166,16 +1209,16 @@ TEST(rcTest, longExecshortDelay) { qwtTestMaxExecTaskUsec = 1000000; qwtTestReqMaxDelayUsec = 0; - tsem_init(&qwtTestQuerySem, 0, 0); - tsem_init(&qwtTestFetchSem, 0, 0); + (void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error + (void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error TdThreadAttr thattr; - taosThreadAttrInit(&thattr); + (void)taosThreadAttrInit(&thattr); //ignore error TdThread t1, t2, t3, t4, t5; - taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); - taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); - taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); + (void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error + (void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error + (void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error while (true) { if (qwtTestDeadLoop) { @@ -1197,8 +1240,8 @@ TEST(rcTest, longExecshortDelay) { if (qwtTestCaseFinished) { if (qwtTestQuitThreadNum < 3) { - tsem_post(&qwtTestQuerySem); - tsem_post(&qwtTestFetchSem); + (void)tsem_post(&qwtTestQuerySem); //ignore error + (void)tsem_post(&qwtTestFetchSem); //ignore error taosUsleep(10); } @@ -1249,16 +1292,16 @@ TEST(rcTest, shortExeclongDelay) { qwtTestMaxExecTaskUsec = 0; qwtTestReqMaxDelayUsec = 1000000; - tsem_init(&qwtTestQuerySem, 0, 0); - tsem_init(&qwtTestFetchSem, 0, 0); + (void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error + (void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error TdThreadAttr thattr; - taosThreadAttrInit(&thattr); + (void)taosThreadAttrInit(&thattr); //ignore error TdThread t1, t2, t3, t4, t5; - taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); - taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); - taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); + (void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error + (void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error + (void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error while (true) { if (qwtTestDeadLoop) { @@ -1280,8 +1323,8 @@ TEST(rcTest, shortExeclongDelay) { if (qwtTestCaseFinished) { if (qwtTestQuitThreadNum < 3) { - tsem_post(&qwtTestQuerySem); - tsem_post(&qwtTestFetchSem); + (void)tsem_post(&qwtTestQuerySem); //ignore error + (void)tsem_post(&qwtTestFetchSem); //ignore error taosUsleep(10); } @@ -1327,16 +1370,16 @@ TEST(rcTest, dropTest) { code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb); ASSERT_EQ(code, 0); - tsem_init(&qwtTestQuerySem, 0, 0); - tsem_init(&qwtTestFetchSem, 0, 0); + (void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error + (void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error TdThreadAttr thattr; - taosThreadAttrInit(&thattr); + (void)taosThreadAttrInit(&thattr); //ignore error TdThread t1, t2, t3, t4, t5; - taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); - taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); - taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); + (void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error + (void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error + (void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error while (true) { if (qwtTestDeadLoop) {