From e6f05d3889c592ab3a8d48cb16ce34ecbc053ed6 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Sat, 22 Jan 2022 22:50:00 +0800 Subject: [PATCH] feature/qnode --- source/libs/qworker/inc/qworkerInt.h | 1 + source/libs/qworker/src/qworker.c | 82 +++-- source/libs/qworker/src/qworkerMsg.c | 7 +- source/libs/qworker/test/qworkerTests.cpp | 375 +++++++++++++++++++--- source/libs/scheduler/src/scheduler.c | 2 +- 5 files changed, 409 insertions(+), 58 deletions(-) diff --git a/source/libs/qworker/inc/qworkerInt.h b/source/libs/qworker/inc/qworkerInt.h index 9ecce3f5f9..2765d7d5d7 100644 --- a/source/libs/qworker/inc/qworkerInt.h +++ b/source/libs/qworker/inc/qworkerInt.h @@ -111,6 +111,7 @@ typedef struct SQWTaskCtx { void *cancelConnection; bool emptyRes; + bool multiExec; int8_t queryContinue; int8_t queryInQueue; int32_t rspCode; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 021bd642bd..f1fd8aa6fb 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -53,6 +53,12 @@ int32_t qwValidateStatus(QW_FPARAMS_DEF, int8_t oriStatus, int8_t newStatus) { break; case JOB_TASK_STATUS_SUCCEED: + if (newStatus != JOB_TASK_STATUS_CANCELLED + && newStatus != JOB_TASK_STATUS_DROPPING) { + QW_ERR_JRET(TSDB_CODE_QRY_APP_ERROR); + } + + break; case JOB_TASK_STATUS_FAILED: case JOB_TASK_STATUS_CANCELLING: if (newStatus != JOB_TASK_STATUS_CANCELLED) { @@ -249,7 +255,7 @@ int32_t qwAcquireTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { *ctx = taosHashAcquire(mgmt->ctxHash, id, sizeof(id)); if (NULL == (*ctx)) { //QW_UNLOCK(rwType, &mgmt->ctxLock); - QW_TASK_ELOG("ctx not in ctxHash, id:%s", id); + QW_TASK_DLOG_E("task ctx not exist, may be dropped"); QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); } @@ -262,7 +268,7 @@ int32_t qwGetTaskCtx(QW_FPARAMS_DEF, SQWTaskCtx **ctx) { *ctx = taosHashGet(mgmt->ctxHash, id, sizeof(id)); if (NULL == (*ctx)) { - QW_TASK_ELOG("ctx not in ctxHash, ctxHashSize:%d", taosHashGetSize(mgmt->ctxHash)); + QW_TASK_DLOG_E("task ctx not exist, may be dropped"); QW_ERR_RET(TSDB_CODE_QRY_TASK_CTX_NOT_EXIST); } @@ -548,6 +554,8 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void return TSDB_CODE_SUCCESS; } + + pOutput->bufStatus = DS_BUF_EMPTY; QW_TASK_DLOG("no res data in sink, need response later, queryEnd:%d", queryEnd); @@ -605,10 +613,10 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu QW_LOCK(QW_WRITE, &ctx->lock); locked = true; - atomic_store_32(&ctx->phase, phase); - switch (phase) { case QW_PHASE_PRE_QUERY: { + atomic_store_8(&ctx->phase, phase); + atomic_store_8(&ctx->taskType, input->taskType); if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL) || QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_DROP)) { @@ -706,6 +714,8 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu break; } case QW_PHASE_PRE_CQUERY: { + atomic_store_8(&ctx->phase, phase); + if (QW_IS_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL)) { QW_TASK_WLOG("task already cancelled, phase:%d", phase); output->needStop = true; @@ -721,17 +731,33 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu } if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_DROP)) { - QW_TASK_ELOG("drop event at wrong phase, phase:%d", phase); + QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS())); + QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE)); + + output->rspCode = TSDB_CODE_QRY_TASK_DROPPED; output->needStop = true; - output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR; - QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); + QW_SET_RSP_CODE(ctx, output->rspCode); + dropConnection = ctx->dropConnection; + + // Note: ctx freed, no need to unlock it + locked = false; + + QW_ERR_JRET(output->rspCode); } else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) { - QW_TASK_ELOG("cancel event at wrong phase, phase:%d", phase); - output->needStop = true; - output->rspCode = TSDB_CODE_QRY_TASK_STATUS_ERROR; - QW_ERR_JRET(TSDB_CODE_QRY_TASK_CANCELLED); + QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED)); + qwFreeTask(QW_FPARAMS(), ctx); + + QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_CANCEL); + + output->needStop = true; + output->rspCode = TSDB_CODE_QRY_TASK_CANCELLED; + QW_SET_RSP_CODE(ctx, output->rspCode); + cancelConnection = ctx->cancelConnection; + + QW_ERR_JRET(output->rspCode); } + if (ctx->rspCode) { QW_TASK_ELOG("task already failed, code:%x, phase:%d", ctx->rspCode, phase); output->needStop = true; @@ -874,7 +900,9 @@ _return: QW_UPDATE_RSP_CODE(ctx, output->rspCode); } - atomic_store_32(&ctx->phase, phase); + if (QW_PHASE_POST_FETCH != phase) { + atomic_store_8(&ctx->phase, phase); + } if (locked) { QW_UNLOCK(QW_WRITE, &ctx->lock); @@ -1063,6 +1091,7 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwGetTaskCtx(QW_FPARAMS(), &ctx)); atomic_store_8(&ctx->queryInQueue, 0); + atomic_store_8(&ctx->queryContinue, 0); DataSinkHandle sinkHandle = ctx->sinkHandle; @@ -1078,6 +1107,10 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { // RC WARNING atomic_store_8(&ctx->queryContinue, 1); } + + if (sOutput.queryEnd) { + needStop = true; + } if (rsp) { qwBuildFetchRsp(rsp, &sOutput, dataLen); @@ -1093,6 +1126,10 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { _return: + if (NULL == ctx) { + break; + } + if (code && QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_FETCH)) { QW_SET_EVENT_PROCESSED(ctx, QW_EVENT_FETCH); qwFreeFetchRsp(rsp); @@ -1101,12 +1138,18 @@ int32_t qwProcessCQuery(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_TASK_DLOG("fetch msg rsped, code:%x, dataLen:%d", code, 0); } - input.code = code; - qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, &output); + QW_LOCK(QW_WRITE, &ctx->lock); + if (needStop || code || 0 == atomic_load_8(&ctx->queryContinue)) { + atomic_store_8(&ctx->phase, 0); + QW_UNLOCK(QW_WRITE,&ctx->lock); + break; + } + + QW_UNLOCK(QW_WRITE,&ctx->lock); + } while (true); - needStop = output.needStop; - code = output.rspCode; - } while ((!needStop) && (0 == code) && atomic_val_compare_exchange_8(&ctx->queryContinue, 1, 0)); + input.code = code; + qwHandlePostPhaseEvents(QW_FPARAMS(), QW_PHASE_POST_CQUERY, &input, &output); QW_RET(code); } @@ -1159,7 +1202,10 @@ int32_t qwProcessFetch(QW_FPARAMS_DEF, SQWMsg *qwMsg) { if (QW_IS_QUERY_RUNNING(ctx)) { atomic_store_8(&ctx->queryContinue, 1); } else if (0 == atomic_load_8(&ctx->queryInQueue)) { - QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING)); + if (!ctx->multiExec) { + QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_EXECUTING)); + ctx->multiExec = true; + } atomic_store_8(&ctx->queryInQueue, 1); diff --git a/source/libs/qworker/src/qworkerMsg.c b/source/libs/qworker/src/qworkerMsg.c index baa4ad2a04..d11bee6dce 100644 --- a/source/libs/qworker/src/qworkerMsg.c +++ b/source/libs/qworker/src/qworkerMsg.c @@ -50,6 +50,7 @@ int32_t qwBuildAndSendQueryRsp(void *connection, int32_t code) { pRsp->code = code; SRpcMsg rpcRsp = { + .msgType = TDMT_VND_QUERY_RSP, .handle = pMsg->handle, .ahandle = pMsg->ahandle, .pCont = pRsp, @@ -68,6 +69,7 @@ int32_t qwBuildAndSendReadyRsp(void *connection, int32_t code) { pRsp->code = code; SRpcMsg rpcRsp = { + .msgType = TDMT_VND_RES_READY_RSP, .handle = pMsg->handle, .ahandle = pMsg->ahandle, .pCont = pRsp, @@ -98,7 +100,7 @@ int32_t qwBuildAndSendStatusRsp(SRpcMsg *pMsg, SSchedulerStatusRsp *sStatus) { } SRpcMsg rpcRsp = { - .msgType = pMsg->msgType + 1, + .msgType = TDMT_VND_TASKS_STATUS_RSP, .handle = pMsg->handle, .ahandle = pMsg->ahandle, .pCont = pRsp, @@ -121,6 +123,7 @@ int32_t qwBuildAndSendFetchRsp(void *connection, SRetrieveTableRsp *pRsp, int32_ } SRpcMsg rpcRsp = { + .msgType = TDMT_VND_FETCH_RSP, .handle = pMsg->handle, .ahandle = pMsg->ahandle, .pCont = pRsp, @@ -138,6 +141,7 @@ int32_t qwBuildAndSendCancelRsp(SRpcMsg *pMsg, int32_t code) { pRsp->code = code; SRpcMsg rpcRsp = { + .msgType = TDMT_VND_CANCEL_TASK_RSP, .handle = pMsg->handle, .ahandle = pMsg->ahandle, .pCont = pRsp, @@ -155,6 +159,7 @@ int32_t qwBuildAndSendDropRsp(void *connection, int32_t code) { pRsp->code = code; SRpcMsg rpcRsp = { + .msgType = TDMT_VND_DROP_TASK_RSP, .handle = pMsg->handle, .ahandle = pMsg->ahandle, .pCont = pRsp, diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index 362156ebcd..5812719c51 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -38,21 +38,25 @@ namespace { -#define qwtTestQueryQueueSize 1000 -#define qwtTestFetchQueueSize 1000 -#define qwtTestMaxExecTaskUsec 2 +#define qwtTestQueryQueueSize 1000000 +#define qwtTestFetchQueueSize 1000000 + +int32_t qwtTestMaxExecTaskUsec = 2; +int32_t qwtTestReqMaxDelayUsec = 2; uint64_t qwtTestQueryId = 0; bool qwtTestEnableSleep = true; bool qwtTestStop = false; -bool qwtTestDeadLoop = true; -int32_t qwtTestMTRunSec = 10; +bool qwtTestDeadLoop = false; +int32_t qwtTestMTRunSec = 60; int32_t qwtTestPrintNum = 100000; int32_t qwtTestCaseIdx = 0; int32_t qwtTestCaseNum = 4; bool qwtTestCaseFinished = false; tsem_t qwtTestQuerySem; tsem_t qwtTestFetchSem; +int32_t qwtTestQuitThreadNum = 0; + int32_t qwtTestQueryQueueRIdx = 0; int32_t qwtTestQueryQueueWIdx = 0; @@ -104,6 +108,7 @@ void qwtBuildQueryReqMsg(SRpcMsg *queryRpc) { qwtqueryMsg.sId = htobe64(1); qwtqueryMsg.taskId = htobe64(1); qwtqueryMsg.contentLen = htonl(100); + queryRpc->msgType = TDMT_VND_QUERY; queryRpc->pCont = &qwtqueryMsg; queryRpc->contLen = sizeof(SSubQueryMsg) + 100; } @@ -112,6 +117,7 @@ void qwtBuildReadyReqMsg(SResReadyReq *readyMsg, SRpcMsg *readyRpc) { readyMsg->sId = htobe64(1); readyMsg->queryId = htobe64(atomic_load_64(&qwtTestQueryId)); readyMsg->taskId = htobe64(1); + readyRpc->msgType = TDMT_VND_RES_READY; readyRpc->pCont = readyMsg; readyRpc->contLen = sizeof(SResReadyReq); } @@ -120,6 +126,7 @@ void qwtBuildFetchReqMsg(SResFetchReq *fetchMsg, SRpcMsg *fetchRpc) { fetchMsg->sId = htobe64(1); fetchMsg->queryId = htobe64(atomic_load_64(&qwtTestQueryId)); fetchMsg->taskId = htobe64(1); + fetchRpc->msgType = TDMT_VND_FETCH; fetchRpc->pCont = fetchMsg; fetchRpc->contLen = sizeof(SResFetchReq); } @@ -128,6 +135,7 @@ void qwtBuildDropReqMsg(STaskDropReq *dropMsg, SRpcMsg *dropRpc) { dropMsg->sId = htobe64(1); dropMsg->queryId = htobe64(atomic_load_64(&qwtTestQueryId)); dropMsg->taskId = htobe64(1); + dropRpc->msgType = TDMT_VND_DROP_TASK; dropRpc->pCont = dropMsg; dropRpc->contLen = sizeof(STaskDropReq); } @@ -146,7 +154,9 @@ int32_t qwtStringToPlan(const char* str, SSubplan** subplan) { int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) { taosWLockLatch(&qwtTestFetchQueueLock); - qwtTestFetchQueue[qwtTestFetchQueueWIdx++] = pMsg; + struct SRpcMsg *newMsg = (struct SRpcMsg *)calloc(1, sizeof(struct SRpcMsg)); + memcpy(newMsg, pMsg, sizeof(struct SRpcMsg)); + qwtTestFetchQueue[qwtTestFetchQueueWIdx++] = newMsg; if (qwtTestFetchQueueWIdx >= qwtTestFetchQueueSize) { qwtTestFetchQueueWIdx = 0; } @@ -167,7 +177,9 @@ int32_t qwtPutReqToFetchQueue(void *node, struct SRpcMsg *pMsg) { int32_t qwtPutReqToQueue(void *node, struct SRpcMsg *pMsg) { taosWLockLatch(&qwtTestQueryQueueLock); - qwtTestQueryQueue[qwtTestQueryQueueWIdx++] = pMsg; + struct SRpcMsg *newMsg = (struct SRpcMsg *)calloc(1, sizeof(struct SRpcMsg)); + memcpy(newMsg, pMsg, sizeof(struct SRpcMsg)); + qwtTestQueryQueue[qwtTestQueryQueueWIdx++] = newMsg; if (qwtTestQueryQueueWIdx >= qwtTestQueryQueueSize) { qwtTestQueryQueueWIdx = 0; } @@ -201,6 +213,7 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) { qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc); } + rpcFreeCont(rsp); break; } case TDMT_VND_RES_READY_RSP: { @@ -213,6 +226,7 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) { qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc); } + rpcFreeCont(rsp); break; } case TDMT_VND_FETCH_RSP: { @@ -226,16 +240,19 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) { qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc); + rpcFreeCont(rsp); break; } - case TDMT_VND_DROP_TASK: { + case TDMT_VND_DROP_TASK_RSP: { STaskDropRsp *rsp = (STaskDropRsp *)pRsp->pCont; + rpcFreeCont(rsp); qwtTestCaseFinished = true; break; } } + return; } @@ -271,16 +288,30 @@ int32_t qwtExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { *pRes = NULL; *useconds = 0; } else { + if (qwtTestSinkQueryEnd) { + *pRes = NULL; + *useconds = rand() % 10; + return 0; + } + endExec = rand() % 5; - if (endExec) { - usleep(rand() % qwtTestMaxExecTaskUsec); + int32_t runTime = 0; + if (qwtTestEnableSleep && qwtTestMaxExecTaskUsec > 0) { + runTime = rand() % qwtTestMaxExecTaskUsec; + } + + if (qwtTestEnableSleep) { + if (runTime) { + usleep(runTime); + } + } + if (endExec) { *pRes = (SSDataBlock*)calloc(1, sizeof(SSDataBlock)); (*pRes)->info.rows = rand() % 1000; } else { *pRes = NULL; - usleep(rand() % qwtTestMaxExecTaskUsec); *useconds = rand() % 10; } } @@ -308,9 +339,9 @@ int32_t qwtPutDataBlock(DataSinkHandle handle, const SInputData* pInput, bool* p qwtTestSinkBlockNum++; if (qwtTestSinkBlockNum >= qwtTestSinkMaxBlockNum) { - *pContinue = true; - } else { *pContinue = false; + } else { + *pContinue = true; } taosWUnLockLatch(&qwtTestSinkLock); @@ -653,7 +684,7 @@ void *statusThread(void *param) { } -void *clientThread(void *param) { +void *qwtclientThread(void *param) { int32_t code = 0; uint32_t n = 0; void *mgmt = param; @@ -672,15 +703,14 @@ void *clientThread(void *param) { usleep(1); } - if (qwtTestEnableSleep) { - usleep(rand()%5); - } if (++n % qwtTestPrintNum == 0) { - printf("query:%d\n", n); + printf("case run:%d\n", n); } } + atomic_add_fetch_32(&qwtTestQuitThreadNum, 1); + return NULL; } @@ -689,9 +719,13 @@ void *queryQueueThread(void *param) { SRpcMsg *queryRpc = NULL; void *mgmt = param; - while (!qwtTestStop) { + while (true) { tsem_wait(&qwtTestQuerySem); + if (qwtTestStop && qwtTestQueryQueueNum <= 0) { + break; + } + taosWLockLatch(&qwtTestQueryQueueLock); if (qwtTestQueryQueueNum <= 0 || qwtTestQueryQueueRIdx == qwtTestQueryQueueWIdx) { printf("query queue is empty\n"); @@ -707,6 +741,15 @@ void *queryQueueThread(void *param) { qwtTestQueryQueueNum--; taosWUnLockLatch(&qwtTestQueryQueueLock); + + if (qwtTestEnableSleep && qwtTestReqMaxDelayUsec > 0) { + int32_t delay = rand() % qwtTestReqMaxDelayUsec; + + if (delay) { + usleep(delay); + } + } + if (TDMT_VND_QUERY == queryRpc->msgType) { qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc); } else if (TDMT_VND_QUERY_CONTINUE == queryRpc->msgType) { @@ -715,8 +758,16 @@ void *queryQueueThread(void *param) { printf("unknown msg in query queue, type:%d\n", queryRpc->msgType); assert(0); } + + free(queryRpc); + + if (qwtTestStop && qwtTestQueryQueueNum <= 0) { + break; + } } + atomic_add_fetch_32(&qwtTestQuitThreadNum, 1); + return NULL; } @@ -725,7 +776,7 @@ void *fetchQueueThread(void *param) { SRpcMsg *fetchRpc = NULL; void *mgmt = param; - while (!qwtTestStop) { + while (true) { tsem_wait(&qwtTestFetchSem); taosWLockLatch(&qwtTestFetchQueueLock); @@ -743,23 +794,45 @@ void *fetchQueueThread(void *param) { qwtTestFetchQueueNum--; taosWUnLockLatch(&qwtTestFetchQueueLock); + if (qwtTestEnableSleep && qwtTestReqMaxDelayUsec > 0) { + int32_t delay = rand() % qwtTestReqMaxDelayUsec; + + if (delay) { + usleep(delay); + } + } + switch (fetchRpc->msgType) { case TDMT_VND_FETCH: qWorkerProcessFetchMsg(mockPointer, mgmt, fetchRpc); + break; case TDMT_VND_RES_READY: qWorkerProcessReadyMsg(mockPointer, mgmt, fetchRpc); + break; case TDMT_VND_TASKS_STATUS: qWorkerProcessStatusMsg(mockPointer, mgmt, fetchRpc); + break; case TDMT_VND_CANCEL_TASK: qWorkerProcessCancelMsg(mockPointer, mgmt, fetchRpc); + break; case TDMT_VND_DROP_TASK: qWorkerProcessDropMsg(mockPointer, mgmt, fetchRpc); + break; default: printf("unknown msg type:%d in fetch queue", fetchRpc->msgType); assert(0); + break; } + + free(fetchRpc); + + if (qwtTestStop && qwtTestFetchQueueNum <= 0) { + break; + } } + atomic_add_fetch_32(&qwtTestQuitThreadNum, 1); + return NULL; } @@ -767,6 +840,7 @@ void *fetchQueueThread(void *param) { } +#if 0 TEST(seqTest, normalCase) { void *mgmt = NULL; @@ -800,31 +874,15 @@ TEST(seqTest, normalCase) { code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue); ASSERT_EQ(code, 0); - qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc); - code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); - ASSERT_EQ(code, 0); - code = qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); ASSERT_EQ(code, 0); - qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc); - code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); - ASSERT_EQ(code, 0); - code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc); ASSERT_EQ(code, 0); - qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc); - code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); - ASSERT_EQ(code, 0); - code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc); ASSERT_EQ(code, 0); - qwtBuildStatusReqMsg(&qwtstatusMsg, &statusRpc); - code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); - ASSERT_EQ(code, 0); - code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc); ASSERT_EQ(code, 0); @@ -914,19 +972,31 @@ TEST(seqTest, randCase) { printf("Ready,%d\n", t++); qwtBuildReadyReqMsg(&readyMsg, &readyRpc); code = qWorkerProcessReadyMsg(mockPointer, mgmt, &readyRpc); + if (qwtTestEnableSleep) { + usleep(1); + } } else if (r >= maxr * 2/5 && r < maxr* 3/5) { printf("Fetch,%d\n", t++); qwtBuildFetchReqMsg(&fetchMsg, &fetchRpc); code = qWorkerProcessFetchMsg(mockPointer, mgmt, &fetchRpc); + if (qwtTestEnableSleep) { + usleep(1); + } } else if (r >= maxr * 3/5 && r < maxr * 4/5) { printf("Drop,%d\n", t++); qwtBuildDropReqMsg(&dropMsg, &dropRpc); code = qWorkerProcessDropMsg(mockPointer, mgmt, &dropRpc); + if (qwtTestEnableSleep) { + usleep(1); + } } else if (r >= maxr * 4/5 && r < maxr-1) { printf("Status,%d\n", t++); qwtBuildStatusReqMsg(&statusMsg, &statusRpc); code = qWorkerProcessStatusMsg(mockPointer, mgmt, &statusRpc); ASSERT_EQ(code, 0); + if (qwtTestEnableSleep) { + usleep(1); + } } else { printf("QUIT RAND NOW"); break; @@ -976,7 +1046,236 @@ TEST(seqTest, multithreadRand) { qWorkerDestroy(&mgmt); } -TEST(rcTest, multithread) { +#endif + +TEST(rcTest, shortExecshortDelay) { + void *mgmt = NULL; + int32_t code = 0; + void *mockPointer = (void *)0x1; + + qwtInitLogFile(); + + stubSetStringToPlan(); + stubSetRpcSendResponse(); + stubSetExecTask(); + stubSetCreateExecTask(); + stubSetAsyncKillTask(); + stubSetDestroyTask(); + stubSetDestroyDataSinker(); + stubSetGetDataLength(); + stubSetEndPut(); + stubSetPutDataBlock(); + stubSetGetDataBlock(); + + srand(time(NULL)); + qwtTestStop = false; + qwtTestQuitThreadNum = 0; + + code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue); + ASSERT_EQ(code, 0); + + qwtTestMaxExecTaskUsec = 0; + qwtTestReqMaxDelayUsec = 0; + + tsem_init(&qwtTestQuerySem, 0, 0); + tsem_init(&qwtTestFetchSem, 0, 0); + + pthread_attr_t thattr; + pthread_attr_init(&thattr); + + pthread_t t1,t2,t3,t4,t5; + pthread_create(&(t1), &thattr, qwtclientThread, mgmt); + pthread_create(&(t2), &thattr, queryQueueThread, mgmt); + pthread_create(&(t3), &thattr, fetchQueueThread, mgmt); + + while (true) { + if (qwtTestDeadLoop) { + sleep(1); + } else { + sleep(qwtTestMTRunSec); + break; + } + } + + qwtTestStop = true; + + while (true) { + if (qwtTestQuitThreadNum == 3) { + break; + } + + sleep(3); + + tsem_post(&qwtTestQuerySem); + usleep(10); + } + + qwtTestQueryQueueNum = 0; + qwtTestQueryQueueRIdx = 0; + qwtTestQueryQueueWIdx = 0; + qwtTestQueryQueueLock = 0; + qwtTestFetchQueueNum = 0; + qwtTestFetchQueueRIdx = 0; + qwtTestFetchQueueWIdx = 0; + qwtTestFetchQueueLock = 0; + + qWorkerDestroy(&mgmt); +} + +TEST(rcTest, longExecshortDelay) { + void *mgmt = NULL; + int32_t code = 0; + void *mockPointer = (void *)0x1; + + qwtInitLogFile(); + + stubSetStringToPlan(); + stubSetRpcSendResponse(); + stubSetExecTask(); + stubSetCreateExecTask(); + stubSetAsyncKillTask(); + stubSetDestroyTask(); + stubSetDestroyDataSinker(); + stubSetGetDataLength(); + stubSetEndPut(); + stubSetPutDataBlock(); + stubSetGetDataBlock(); + + srand(time(NULL)); + qwtTestStop = false; + qwtTestQuitThreadNum = 0; + + code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue); + ASSERT_EQ(code, 0); + + qwtTestMaxExecTaskUsec = 1000000; + qwtTestReqMaxDelayUsec = 0; + + tsem_init(&qwtTestQuerySem, 0, 0); + tsem_init(&qwtTestFetchSem, 0, 0); + + pthread_attr_t thattr; + pthread_attr_init(&thattr); + + pthread_t t1,t2,t3,t4,t5; + pthread_create(&(t1), &thattr, qwtclientThread, mgmt); + pthread_create(&(t2), &thattr, queryQueueThread, mgmt); + pthread_create(&(t3), &thattr, fetchQueueThread, mgmt); + + while (true) { + if (qwtTestDeadLoop) { + sleep(1); + } else { + sleep(qwtTestMTRunSec); + break; + } + } + + qwtTestStop = true; + + + while (true) { + if (qwtTestQuitThreadNum == 3) { + break; + } + + sleep(3); + + tsem_post(&qwtTestQuerySem); + usleep(10); + } + + qwtTestQueryQueueNum = 0; + qwtTestQueryQueueRIdx = 0; + qwtTestQueryQueueWIdx = 0; + qwtTestQueryQueueLock = 0; + qwtTestFetchQueueNum = 0; + qwtTestFetchQueueRIdx = 0; + qwtTestFetchQueueWIdx = 0; + qwtTestFetchQueueLock = 0; + + qWorkerDestroy(&mgmt); +} + + +TEST(rcTest, shortExeclongDelay) { + void *mgmt = NULL; + int32_t code = 0; + void *mockPointer = (void *)0x1; + + qwtInitLogFile(); + + stubSetStringToPlan(); + stubSetRpcSendResponse(); + stubSetExecTask(); + stubSetCreateExecTask(); + stubSetAsyncKillTask(); + stubSetDestroyTask(); + stubSetDestroyDataSinker(); + stubSetGetDataLength(); + stubSetEndPut(); + stubSetPutDataBlock(); + stubSetGetDataBlock(); + + srand(time(NULL)); + qwtTestStop = false; + qwtTestQuitThreadNum = 0; + + code = qWorkerInit(NODE_TYPE_VNODE, 1, NULL, &mgmt, mockPointer, qwtPutReqToQueue); + ASSERT_EQ(code, 0); + + qwtTestMaxExecTaskUsec = 0; + qwtTestReqMaxDelayUsec = 1000000; + + tsem_init(&qwtTestQuerySem, 0, 0); + tsem_init(&qwtTestFetchSem, 0, 0); + + pthread_attr_t thattr; + pthread_attr_init(&thattr); + + pthread_t t1,t2,t3,t4,t5; + pthread_create(&(t1), &thattr, qwtclientThread, mgmt); + pthread_create(&(t2), &thattr, queryQueueThread, mgmt); + pthread_create(&(t3), &thattr, fetchQueueThread, mgmt); + + while (true) { + if (qwtTestDeadLoop) { + sleep(1); + } else { + sleep(qwtTestMTRunSec); + break; + } + } + + qwtTestStop = true; + + + while (true) { + if (qwtTestQuitThreadNum == 3) { + break; + } + + sleep(3); + + tsem_post(&qwtTestQuerySem); + usleep(10); + } + + qwtTestQueryQueueNum = 0; + qwtTestQueryQueueRIdx = 0; + qwtTestQueryQueueWIdx = 0; + qwtTestQueryQueueLock = 0; + qwtTestFetchQueueNum = 0; + qwtTestFetchQueueRIdx = 0; + qwtTestFetchQueueWIdx = 0; + qwtTestFetchQueueLock = 0; + + qWorkerDestroy(&mgmt); +} + + +#if 0 +TEST(rcTest, dropTest) { void *mgmt = NULL; int32_t code = 0; void *mockPointer = (void *)0x1; @@ -1025,7 +1324,7 @@ TEST(rcTest, multithread) { qWorkerDestroy(&mgmt); } - +#endif int main(int argc, char** argv) { diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 0ad51d0b57..d5f7b85b1b 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -891,7 +891,7 @@ int32_t schHandleResponseMsg(SSchJob *pJob, SSchTask *pTask, int32_t msgType, ch break; } - case TDMT_VND_DROP_TASK: { + case TDMT_VND_DROP_TASK_RSP: { // SHOULD NEVER REACH HERE SCH_TASK_ELOG("invalid status to handle drop task rsp, ref:%d", atomic_load_32(&pJob->ref)); SCH_ERR_JRET(TSDB_CODE_SCH_INTERNAL_ERROR);