diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index bcfbafd4c9..021bd642bd 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -629,7 +629,9 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu dropConnection = ctx->dropConnection; // Note: ctx freed, no need to unlock it - locked = false; + locked = false; + + break; } else if (QW_IS_EVENT_RECEIVED(ctx, QW_EVENT_CANCEL)) { QW_ERR_JRET(qwAddTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_CANCELLED)); @@ -639,6 +641,8 @@ int32_t qwHandlePrePhaseEvents(QW_FPARAMS_DEF, int8_t phase, SQWPhaseInput *inpu QW_SET_RSP_CODE(ctx, output->rspCode); cancelConnection = ctx->cancelConnection; + + break; } if (ctx->rspCode) { @@ -1215,8 +1219,6 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { QW_ERR_JRET(qwKillTaskHandle(QW_FPARAMS(), ctx)); QW_ERR_JRET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_DROPPING)); - - ctx->dropConnection = qwMsg->connection; } else if (ctx->phase > 0) { QW_ERR_JRET(qwDropTaskStatus(QW_FPARAMS())); QW_ERR_JRET(qwDropTaskCtx(QW_FPARAMS(), QW_WRITE)); @@ -1225,11 +1227,11 @@ int32_t qwProcessDrop(QW_FPARAMS_DEF, SQWMsg *qwMsg) { locked = false; needRsp = true; - - QW_ERR_JRET(TSDB_CODE_QRY_TASK_DROPPED); } - if (!needRsp) { + if (!needRsp) { + ctx->dropConnection = qwMsg->connection; + QW_SET_EVENT_RECEIVED(ctx, QW_EVENT_DROP); } @@ -1239,6 +1241,10 @@ _return: QW_UPDATE_RSP_CODE(ctx, code); } + if (locked) { + QW_UNLOCK(QW_WRITE, &ctx->lock); + } + if (ctx) { qwReleaseTaskCtx(mgmt, ctx); } diff --git a/source/libs/qworker/test/qworkerTests.cpp b/source/libs/qworker/test/qworkerTests.cpp index a94af4f69b..5720c2d47c 100644 --- a/source/libs/qworker/test/qworkerTests.cpp +++ b/source/libs/qworker/test/qworkerTests.cpp @@ -137,7 +137,7 @@ void qwtBuildStatusReqMsg(SSchTasksStatusReq *statusMsg, SRpcMsg *statusRpc) { } int32_t qwtStringToPlan(const char* str, SSubplan** subplan) { - *subplan = 0x1; + *subplan = (SSubplan *)0x1; return 0; } @@ -192,10 +192,10 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) { if (0 == pRsp->code) { qwtBuildReadyReqMsg(&qwtreadyMsg, &qwtreadyRpc); - qwtPutReqToFetchQueue(0x1, &qwtreadyRpc); + qwtPutReqToFetchQueue((void *)0x1, &qwtreadyRpc); } else { qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); - qwtPutReqToFetchQueue(0x1, &qwtdropRpc); + qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc); } break; @@ -205,10 +205,10 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) { if (0 == pRsp->code) { qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc); - qwtPutReqToFetchQueue(0x1, &qwtfetchRpc); + qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc); } else { qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); - qwtPutReqToFetchQueue(0x1, &qwtdropRpc); + qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc); } break; } @@ -217,12 +217,12 @@ void qwtRpcSendResponse(const SRpcMsg *pRsp) { if (0 == pRsp->code && 0 == rsp->completed) { qwtBuildFetchReqMsg(&qwtfetchMsg, &qwtfetchRpc); - qwtPutReqToFetchQueue(0x1, &qwtfetchRpc); + qwtPutReqToFetchQueue((void *)0x1, &qwtfetchRpc); return; } qwtBuildDropReqMsg(&qwtdropMsg, &qwtdropRpc); - qwtPutReqToFetchQueue(0x1, &qwtdropRpc); + qwtPutReqToFetchQueue((void *)0x1, &qwtdropRpc); break; } @@ -352,7 +352,7 @@ int32_t qwtGetDataBlock(DataSinkHandle handle, SOutputData* pOutput) { qwtTestSinkBlockNum--; pOutput->numOfRows = rand() % 10 + 1; pOutput->compressed = 1; - pOutput->pData = malloc(pOutput->numOfRows); + pOutput->pData = (char *)malloc(pOutput->numOfRows); pOutput->queryEnd = qwtTestSinkQueryEnd; if (qwtTestSinkBlockNum == 0) { pOutput->bufStatus = DS_BUF_EMPTY; @@ -648,7 +648,7 @@ void *clientThread(void *param) { qwtTestCaseFinished = false; qwtBuildQueryReqMsg(&queryRpc); - qwtPutReqToQueue(0x1, &queryRpc); + qwtPutReqToQueue((void *)0x1, &queryRpc); while (!qwtTestCaseFinished) { usleep(1); @@ -692,15 +692,16 @@ void *queryQueueThread(void *param) { taosWUnLockLatch(&qwtTestQueryQueueLock); if (TDMT_VND_QUERY == queryRpc->msgType) { - qWorkerProcessQueryMsg(mockPointer, mgmt, &queryRpc); + qWorkerProcessQueryMsg(mockPointer, mgmt, queryRpc); } else if (TDMT_VND_QUERY_CONTINUE == queryRpc->msgType) { - qWorkerProcessCQueryMsg(mockPointer, mgmt, &queryRpc) + qWorkerProcessCQueryMsg(mockPointer, mgmt, queryRpc); } else { printf("unknown msg in query queue, type:%d\n", queryRpc->msgType); assert(0); } } + return NULL; } void *fetchQueueThread(void *param) { @@ -743,6 +744,7 @@ void *fetchQueueThread(void *param) { } } + return NULL; } diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 366cc62a7e..0ad51d0b57 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -275,7 +275,7 @@ int32_t schBuildTaskRalation(SSchJob *pJob, SHashObj *planToTask) { } -int32_t schRecordTaskSucceedNode(SSchTask *pTask) { +int32_t schRecordTaskSucceedNode(SSchJob *pJob, SSchTask *pTask) { int32_t idx = atomic_load_8(&pTask->candidateIdx); SQueryNodeAddr *addr = taosArrayGet(pTask->candidateAddrs, idx); if (NULL == addr) {