diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 29f667cb66..2f360044c9 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -824,14 +824,17 @@ int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode) { qDebug("%s sync killed execTask", GET_TASKID(pTaskInfo)); setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED); - taosWLockLatch(&pTaskInfo->lock); - while (qTaskIsExecuting(pTaskInfo)) { - taosMsleep(10); + while(1) { + taosWLockLatch(&pTaskInfo->lock); + if (qTaskIsExecuting(pTaskInfo)) { // let's wait for 100 ms and try again + taosWUnLockLatch(&pTaskInfo->lock); + taosMsleep(100); + } else { // not running now + pTaskInfo->code = rspCode; + taosWUnLockLatch(&pTaskInfo->lock); + return TSDB_CODE_SUCCESS; + } } - pTaskInfo->code = rspCode; - taosWUnLockLatch(&pTaskInfo->lock); - - return TSDB_CODE_SUCCESS; } bool qTaskIsExecuting(qTaskInfo_t qinfo) { diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 4fa9b2c66f..0aace1cb5b 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1631,6 +1631,8 @@ int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t ta streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false); } } else { + streamMetaRUnLock(pMeta); + stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped", streamId, taskId, pMeta->vgId); code = TSDB_CODE_STREAM_TASK_NOT_EXIST;