fix: qworker ut cases

This commit is contained in:
dapan1121 2024-07-15 15:40:39 +08:00
parent a8c39cdd11
commit 50d7a458e5
1 changed files with 105 additions and 62 deletions

View File

@ -130,7 +130,7 @@ void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) {
fetchRpc->contLen = sizeof(SResFetchReq);
}
void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) {
int qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) {
dropMsg->sId = 1;
dropMsg->queryId = atomic_load_64(&qwtTestQueryId);
dropMsg->taskId = 1;
@ -164,6 +164,10 @@ int32_t qwtStringToPlan(const char *str, SSubplan **subplan) {
int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) {
taosWLockLatch(&qwtTestFetchQueueLock);
struct SRpcMsg *newMsg = (struct SRpcMsg *)taosMemoryCalloc(1, sizeof(struct SRpcMsg));
if (NULL == newMsg) {
printf("malloc failed");
assert(0);
}
memcpy(newMsg, pMsg, sizeof(struct SRpcMsg));
qwtTestFetchQueue[qwtTestFetchQueueWIdx++] = newMsg;
if (qwtTestFetchQueueWIdx >= qwtTestFetchQueueSize) {
@ -178,7 +182,10 @@ int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) {
}
taosWUnLockLatch(&qwtTestFetchQueueLock);
tsem_post(&qwtTestFetchSem);
if (tsem_post(&qwtTestFetchSem) < 0) {
printf("tsem_post failed, errno:%d", errno);
assert(0);
}
return 0;
}
@ -186,6 +193,10 @@ int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) {
int32_t qwtPutReqToQueue(void *node, EQueueType qtype, struct SRpcMsg *pMsg) {
taosWLockLatch(&qwtTestQueryQueueLock);
struct SRpcMsg *newMsg = (struct SRpcMsg *)taosMemoryCalloc(1, sizeof(struct SRpcMsg));
if (NULL == newMsg) {
printf("malloc failed");
assert(0);
}
memcpy(newMsg, pMsg, sizeof(struct SRpcMsg));
qwtTestQueryQueue[qwtTestQueryQueueWIdx++] = newMsg;
if (qwtTestQueryQueueWIdx >= qwtTestQueryQueueSize) {
@ -200,22 +211,34 @@ int32_t qwtPutReqToQueue(void *node, EQueueType qtype, struct SRpcMsg *pMsg) {
}
taosWUnLockLatch(&qwtTestQueryQueueLock);
tsem_post(&qwtTestQuerySem);
if (tsem_post(&qwtTestQuerySem) < 0) {
printf("tsem_post failed, errno:%d", errno);
assert(0);
}
return 0;
}
void qwtSendReqToDnode(void *pVnode, struct SEpSet *epSet, struct SRpcMsg *pReq) {}
void qwtRpcSendResponse(const SRpcMsg *pRsp) {
int qwtRpcSendResponse(const SRpcMsg *pRsp) {
int32_t code = 0;
switch (pRsp->msgType) {
case TDMT_SCH_QUERY_RSP:
case TDMT_SCH_MERGE_QUERY_RSP: {
SQueryTableRsp *rsp = (SQueryTableRsp *)pRsp->pCont;
if (pRsp->code) {
qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
code = qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
if (code) {
assert(0);
return code;
}
code = qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
if (code) {
assert(0);
return code;
}
}
rpcFreeCont(rsp);
@ -227,13 +250,25 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
if (0 == pRsp->code && 0 == rsp->completed) {
qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc);
qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc);
code = qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc);
if (code) {
assert(0);
return code;
}
rpcFreeCont(rsp);
return;
}
qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
code = qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc);
if (code) {
assert(0);
return code;
}
code = qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc);
if (code) {
assert(0);
return code;
}
rpcFreeCont(rsp);
break;
@ -245,9 +280,11 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) {
qwtTestCaseFinished = true;
break;
}
default:
break;
}
return;
return code;
}
int32_t qwtCreateExecTask(void *tsdb, int32_t vgId, uint64_t taskId, struct SSubplan *pPlan, qTaskInfo_t *pTaskInfo,
@ -292,6 +329,9 @@ int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock **pRes, uint64_t *useconds) {
if (endExec) {
*pRes = (SSDataBlock *)taosMemoryCalloc(1, sizeof(SSDataBlock));
if (NULL == *pRes) {
return terrno;
}
(*pRes)->info.rows = taosRand() % 1000 + 1;
} else {
*pRes = NULL;
@ -631,7 +671,7 @@ void *queryThread(void *param) {
while (!qwtTestStop) {
qwtBuildQueryReqMsg(&queryRpc);
qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0);
(void)qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); // ignore error
if (qwtTestEnableSleep) {
taosUsleep(taosRand() % 5);
}
@ -653,7 +693,7 @@ void *fetchThread(void *param) {
while (!qwtTestStop) {
qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0);
(void)qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); // ignore error
if (qwtTestEnableSleep) {
taosUsleep(taosRand() % 5);
}
@ -674,8 +714,11 @@ void *dropThread(void *param) {
STaskDropReq dropMsg = {0};
while (!qwtTestStop) {
qwtBuildDropReqMsg(&dropMsg, &dropRpc);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0);
if (0 != qwtBuildDropReqMsg(&dropMsg, &dropRpc)) {
break;
}
(void)qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); // ignore error
if (qwtTestEnableSleep) {
taosUsleep(taosRand() % 5);
}
@ -700,7 +743,7 @@ void *qwtclientThread(void *param) {
qwtTestCaseFinished = false;
qwtBuildQueryReqMsg(&queryRpc);
qwtPutReqToQueue((void *)0x1, QUERY_QUEUE, &queryRpc);
(void)qwtPutReqToQueue((void *)0x1, QUERY_QUEUE, &queryRpc); //ignore error
while (!qwtTestCaseFinished) {
taosUsleep(1);
@ -752,9 +795,9 @@ void *queryQueueThread(void *param) {
}
if (TDMT_SCH_QUERY == queryRpc->msgType) {
qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc, 0);
(void)qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc, 0); //ignore error
} else if (TDMT_SCH_QUERY_CONTINUE == queryRpc->msgType) {
qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc, 0);
(void)qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc, 0); //ignore error
} else {
printf("unknown msg in query queue, type:%d\n", queryRpc->msgType);
assert(0);
@ -810,16 +853,16 @@ void *fetchQueueThread(void *param) {
switch (fetchRpc->msgType) {
case TDMT_SCH_FETCH:
case TDMT_SCH_MERGE_FETCH:
qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0);
(void)qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc, 0); //ignore error
break;
case TDMT_SCH_CANCEL_TASK:
//qWorkerProcessCancelMsg(mockPointer, mgmt, fetchRpc, 0);
break;
case TDMT_SCH_DROP_TASK:
qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc, 0);
(void)qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc, 0); //ignore error
break;
case TDMT_SCH_TASK_NOTIFY:
qWorkerProcessNotifyMsg(mockPointer, mgmt, fetchRpc, 0);
(void)qWorkerProcessNotifyMsg(mockPointer, mgmt, fetchRpc, 0); //ignore error
break;
default:
printf("unknown msg type:%d in fetch queue", fetchRpc->msgType);
@ -853,7 +896,7 @@ TEST(seqTest, normalCase) {
qwtBuildQueryReqMsg(&queryRpc);
qwtBuildFetchReqMsg(&qwtfetchMsg, &fetchRpc);
qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc);
(void)qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); //ignore error
stubSetStringToPlan();
stubSetRpcSendResponse();
@ -898,7 +941,7 @@ TEST(seqTest, cancelFirst) {
qwtInitLogFile();
qwtBuildQueryReqMsg(&queryRpc);
qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc);
(void)qwtBuildDropReqMsg(&qwtdropMsg, &dropRpc); //ignore error
stubSetStringToPlan();
stubSetRpcSendResponse();
@ -954,7 +997,7 @@ TEST(seqTest, randCase) {
if (r >= 0 && r < maxr / 5) {
printf("Query,%d\n", t++);
qwtBuildQueryReqMsg(&queryRpc);
code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0);
(void)qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc, 0); //ignore error
} else if (r >= maxr / 5 && r < maxr * 2 / 5) {
// printf("Ready,%d\n", t++);
// qwtBuildReadyReqMsg(&readyMsg, &readyRpc);
@ -965,14 +1008,14 @@ TEST(seqTest, randCase) {
} else if (r >= maxr * 2 / 5 && r < maxr * 3 / 5) {
printf("Fetch,%d\n", t++);
qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc);
code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0);
(void)qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc, 0); //ignore error
if (qwtTestEnableSleep) {
taosUsleep(1);
}
} else if (r >= maxr * 3 / 5 && r < maxr * 4 / 5) {
printf("Drop,%d\n", t++);
qwtBuildDropReqMsg(&dropMsg, &dropRpc);
code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0);
(void)qwtBuildDropReqMsg(&dropMsg, &dropRpc); //ignore error
(void)qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc, 0); //ignore error
if (qwtTestEnableSleep) {
taosUsleep(1);
}
@ -1018,14 +1061,14 @@ TEST(seqTest, multithreadRand) {
ASSERT_EQ(code, 0);
TdThreadAttr thattr;
taosThreadAttrInit(&thattr);
(void)taosThreadAttrInit(&thattr); //ignore error
TdThread t1, t2, t3, t4, t5, t6;
taosThreadCreate(&(t1), &thattr, queryThread, mgmt);
// taosThreadCreate(&(t2), &thattr, readyThread, NULL);
taosThreadCreate(&(t3), &thattr, fetchThread, NULL);
taosThreadCreate(&(t4), &thattr, dropThread, NULL);
taosThreadCreate(&(t6), &thattr, fetchQueueThread, mgmt);
(void)taosThreadCreate(&(t1), &thattr, queryThread, mgmt); //ignore error
// (void)taosThreadCreate(&(t2), &thattr, readyThread, NULL); //ignore error
(void)taosThreadCreate(&(t3), &thattr, fetchThread, NULL); //ignore error
(void)taosThreadCreate(&(t4), &thattr, dropThread, NULL); //ignore error
(void)taosThreadCreate(&(t6), &thattr, fetchQueueThread, mgmt); //ignore error
while (true) {
if (qwtTestDeadLoop) {
@ -1083,16 +1126,16 @@ TEST(rcTest, shortExecshortDelay) {
qwtTestMaxExecTaskUsec = 0;
qwtTestReqMaxDelayUsec = 0;
tsem_init(&qwtTestQuerySem, 0, 0);
tsem_init(&qwtTestFetchSem, 0, 0);
(void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error
(void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error
TdThreadAttr thattr;
taosThreadAttrInit(&thattr);
(void)taosThreadAttrInit(&thattr); //ignore error
TdThread t1, t2, t3, t4, t5;
taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt);
taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt);
taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt);
(void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error
(void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error
(void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error
while (true) {
if (qwtTestDeadLoop) {
@ -1114,8 +1157,8 @@ TEST(rcTest, shortExecshortDelay) {
if (qwtTestCaseFinished) {
if (qwtTestQuitThreadNum < 3) {
tsem_post(&qwtTestQuerySem);
tsem_post(&qwtTestFetchSem);
(void)tsem_post(&qwtTestQuerySem); //ignore error
(void)tsem_post(&qwtTestFetchSem); //ignore error
taosUsleep(10);
}
@ -1166,16 +1209,16 @@ TEST(rcTest, longExecshortDelay) {
qwtTestMaxExecTaskUsec = 1000000;
qwtTestReqMaxDelayUsec = 0;
tsem_init(&qwtTestQuerySem, 0, 0);
tsem_init(&qwtTestFetchSem, 0, 0);
(void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error
(void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error
TdThreadAttr thattr;
taosThreadAttrInit(&thattr);
(void)taosThreadAttrInit(&thattr); //ignore error
TdThread t1, t2, t3, t4, t5;
taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt);
taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt);
taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt);
(void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error
(void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error
(void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error
while (true) {
if (qwtTestDeadLoop) {
@ -1197,8 +1240,8 @@ TEST(rcTest, longExecshortDelay) {
if (qwtTestCaseFinished) {
if (qwtTestQuitThreadNum < 3) {
tsem_post(&qwtTestQuerySem);
tsem_post(&qwtTestFetchSem);
(void)tsem_post(&qwtTestQuerySem); //ignore error
(void)tsem_post(&qwtTestFetchSem); //ignore error
taosUsleep(10);
}
@ -1249,16 +1292,16 @@ TEST(rcTest, shortExeclongDelay) {
qwtTestMaxExecTaskUsec = 0;
qwtTestReqMaxDelayUsec = 1000000;
tsem_init(&qwtTestQuerySem, 0, 0);
tsem_init(&qwtTestFetchSem, 0, 0);
(void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error
(void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error
TdThreadAttr thattr;
taosThreadAttrInit(&thattr);
(void)taosThreadAttrInit(&thattr); //ignore error
TdThread t1, t2, t3, t4, t5;
taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt);
taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt);
taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt);
(void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error
(void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error
(void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error
while (true) {
if (qwtTestDeadLoop) {
@ -1280,8 +1323,8 @@ TEST(rcTest, shortExeclongDelay) {
if (qwtTestCaseFinished) {
if (qwtTestQuitThreadNum < 3) {
tsem_post(&qwtTestQuerySem);
tsem_post(&qwtTestFetchSem);
(void)tsem_post(&qwtTestQuerySem); //ignore error
(void)tsem_post(&qwtTestFetchSem); //ignore error
taosUsleep(10);
}
@ -1327,16 +1370,16 @@ TEST(rcTest, dropTest) {
code = qWorkerInit(NODE_TYPE_VNODE, 1, &mgmt, &msgCb);
ASSERT_EQ(code, 0);
tsem_init(&qwtTestQuerySem, 0, 0);
tsem_init(&qwtTestFetchSem, 0, 0);
(void)tsem_init(&qwtTestQuerySem, 0, 0); //ignore error
(void)tsem_init(&qwtTestFetchSem, 0, 0); //ignore error
TdThreadAttr thattr;
taosThreadAttrInit(&thattr);
(void)taosThreadAttrInit(&thattr); //ignore error
TdThread t1, t2, t3, t4, t5;
taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt);
taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt);
taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt);
(void)taosThreadCreate(&(t1), &thattr, qwtclientThread, mgmt); //ignore error
(void)taosThreadCreate(&(t2), &thattr, queryQueueThread, mgmt); //ignore error
(void)taosThreadCreate(&(t3), &thattr, fetchQueueThread, mgmt); //ignore error
while (true) {
if (qwtTestDeadLoop) {