diff --git a/include/dnode/vnode/tqCommon.h b/include/dnode/vnode/tqCommon.h index 69a11638b1..4d5e18520c 100644 --- a/include/dnode/vnode/tqCommon.h +++ b/include/dnode/vnode/tqCommon.h @@ -34,7 +34,7 @@ int32_t tqStreamTaskProcessConsenChkptIdReq(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamProcessCheckpointReadyRsp(SStreamMeta* pMeta, SRpcMsg* pMsg); int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sversion, char* msg, int32_t msgLen, bool isLeader, bool restored); -int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, SMsgCb* cb, char* msg, int32_t msgLen); +int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen); int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader); int32_t tqStartTaskCompleteCallback(SStreamMeta* pMeta); int32_t tqStreamTasksGetTotalNum(SStreamMeta* pMeta); diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 5b4224d575..988a650d6c 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -180,7 +180,7 @@ void qCleanExecTaskBlockBuf(qTaskInfo_t tinfo); */ int32_t qAsyncKillTask(qTaskInfo_t tinfo, int32_t rspCode); -int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waiting); +int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitDuration); bool qTaskIsExecuting(qTaskInfo_t qinfo); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1f12b8addd..b7d560e060 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -760,7 +760,6 @@ int32_t streamMetaRemoveTaskInMeta(SStreamMeta* pMeta, STaskId* pKey); int32_t streamMetaRegisterTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask, bool* pAdded); int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); int32_t streamMetaGetNumOfTasks(SStreamMeta* pMeta); -int32_t streamMetaDropTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); int32_t streamMetaAcquireTaskNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, SStreamTask** pTask); int32_t streamMetaAcquireTaskUnsafe(SStreamMeta* pMeta, STaskId* pId, SStreamTask** pTask); @@ -800,6 +799,7 @@ void streamMetaLoadAllTasks(SStreamMeta* pMeta); int32_t streamMetaStartAllTasks(SStreamMeta* pMeta); int32_t streamMetaStopAllTasks(SStreamMeta* pMeta); int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); +int32_t streamMetaStopOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId); bool streamMetaAllTasksReady(const SStreamMeta* pMeta); int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask); int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts); diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index 6899140af9..6eee8c510b 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -155,7 +155,7 @@ int32_t sndProcessWriteMsg(SSnode *pSnode, SRpcMsg *pMsg, SRpcMsg *pRsp) { } case TDMT_STREAM_TASK_DROP: - return tqStreamTaskProcessDropReq(pSnode->pMeta, &pSnode->msgCb, pMsg->pCont, pMsg->contLen); + return tqStreamTaskProcessDropReq(pSnode->pMeta, pMsg->pCont, pMsg->contLen); case TDMT_VND_STREAM_TASK_UPDATE: return tqStreamTaskProcessUpdateReq(pSnode->pMeta, &pSnode->msgCb, pMsg, true); case TDMT_VND_STREAM_TASK_RESET: diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index bd2a7207f2..1c0f73249c 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1118,7 +1118,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { } int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { - return tqStreamTaskProcessDropReq(pTq->pStreamMeta, &pTq->pVnode->msgCb, msg, msgLen); + return tqStreamTaskProcessDropReq(pTq->pStreamMeta, msg, msgLen); } int32_t tqProcessTaskUpdateCheckpointReq(STQ* pTq, char* msg, int32_t msgLen) { diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 817868a1d6..76f0fd6bc6 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -688,7 +688,7 @@ int32_t tqStreamTaskProcessDeployReq(SStreamMeta* pMeta, SMsgCb* cb, int64_t sve return code; } -int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, SMsgCb* cb, char* msg, int32_t msgLen) { +int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen) { SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; int32_t code = 0; int32_t vgId = pMeta->vgId; @@ -860,7 +860,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId); return code; } else if (type == STREAM_EXEC_T_STOP_ONE_TASK) { - code = streamMetaDropTask(pMeta, req.streamId, req.taskId); + code = streamMetaStopOneTask(pMeta, req.streamId, req.taskId); return code; } else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while SStreamTask* pTask = NULL; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index f271679e37..1aa114a02a 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -972,34 +972,43 @@ int32_t qAsyncKillTask(qTaskInfo_t qinfo, int32_t rspCode) { return TSDB_CODE_SUCCESS; } -int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitingDuration) { +int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitDuration) { int64_t st = taosGetTimestampMs(); SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; if (pTaskInfo == NULL) { return TSDB_CODE_QRY_INVALID_QHANDLE; } - qDebug("%s sync killed execTask, and waiting for %.2fs", GET_TASKID(pTaskInfo), waitingDuration/1000.0); + if (waitDuration > 0) { + qDebug("%s sync killed execTask, and waiting for %.2fs", GET_TASKID(pTaskInfo), waitDuration/1000.0); + } else { + qDebug("%s async killed execTask", GET_TASKID(pTaskInfo)); + } + setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED); -// while (1) { -// taosWLockLatch(&pTaskInfo->lock); -// if (qTaskIsExecuting(pTaskInfo)) { // let's wait for 100 ms and try again -// taosWUnLockLatch(&pTaskInfo->lock); -// -// taosMsleep(200); -// -// int64_t d = taosGetTimestampMs() - st; -// if (d >= waitingDuration && waitingDuration >= 0) { -// qWarn("%s waiting more than %.2fs, not wait anymore", GET_TASKID(pTaskInfo), waitingDuration/1000.0); -// return TSDB_CODE_SUCCESS; -// } -// } else { // not running now -// pTaskInfo->code = rspCode; -// taosWUnLockLatch(&pTaskInfo->lock); - return TSDB_CODE_SUCCESS; -// } -// } + if (waitDuration > 0) { + while (1) { + taosWLockLatch(&pTaskInfo->lock); + if (qTaskIsExecuting(pTaskInfo)) { // let's wait for 100 ms and try again + taosWUnLockLatch(&pTaskInfo->lock); + + taosMsleep(200); + + int64_t d = taosGetTimestampMs() - st; + if (d >= waitDuration && waitDuration >= 0) { + qWarn("%s waiting more than %.2fs, not wait anymore", GET_TASKID(pTaskInfo), waitDuration / 1000.0); + return TSDB_CODE_SUCCESS; + } + } else { // not running now + pTaskInfo->code = rspCode; + taosWUnLockLatch(&pTaskInfo->lock); + return TSDB_CODE_SUCCESS; + } + } + } + + return TSDB_CODE_SUCCESS; } bool qTaskIsExecuting(qTaskInfo_t qinfo) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index c7e0567d20..780a369409 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -897,7 +897,7 @@ static int32_t streamTaskSendTransSuccessMsg(SStreamTask* pTask, void* param) { // let's kill the query procedure within stream, to end it ASAP. if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) { - code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, 5000); + code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, -1); if (code != TSDB_CODE_SUCCESS) { stError("s-task:%s failed to kill task related query handle, code:%s", pTask->id.idStr, tstrerror(code)); } @@ -965,7 +965,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t return 0; } -int32_t streamMetaStopTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { +int32_t streamMetaStopOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { SStreamTask* pTask = NULL; int32_t code = 0; int32_t vgId = pMeta->vgId; diff --git a/source/libs/stream/src/streamStartTask.c b/source/libs/stream/src/streamStartTask.c index 9c16ff036e..d9ca506849 100644 --- a/source/libs/stream/src/streamStartTask.c +++ b/source/libs/stream/src/streamStartTask.c @@ -447,7 +447,6 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) { continue; } - int64_t refId = pTask->id.refId; int32_t ret = streamTaskStop(pTask); if (ret) { stError("s-task:0x%x failed to stop task, code:%s", pTaskId->taskId, tstrerror(ret));