diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index cc12566389..1f12b8addd 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -58,7 +58,7 @@ extern "C" { #define STREAM_EXEC_T_STOP_ALL_TASKS (-5) #define STREAM_EXEC_T_RESUME_TASK (-6) #define STREAM_EXEC_T_ADD_FAILED_TASK (-7) -#define STREAM_EXEC_T_DROP_ONE_TASK (-8) +#define STREAM_EXEC_T_STOP_ONE_TASK (-8) typedef struct SStreamTask SStreamTask; typedef struct SStreamQueue SStreamQueue; diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 29b9141759..817868a1d6 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -720,40 +720,31 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, SMsgCb* cb, char* msg, in // drop the related fill-history task firstly if (hTaskId.taskId != 0 && hTaskId.streamId != 0) { - code = streamTaskSchedTask(cb, vgId, hTaskId.streamId, (int32_t)hTaskId.taskId, STREAM_EXEC_T_DROP_ONE_TASK); + tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x firstly", pReq->taskId, vgId, (int32_t)hTaskId.taskId); + code = streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId); if (code) { - tqError("s-task:0x%x vgId:%d failed to create msg to drop rel fill-history task:0x%x, code:%s", pReq->taskId, - vgId, (int32_t)hTaskId.taskId, tstrerror(code)); - } else { - tqDebug("s-task:0x%x vgId:%d create msg to drop rel fill-history task:0x%x succ", pReq->taskId, vgId, + tqDebug("s-task:0x%x vgId:%d drop rel fill-history task:0x%x failed", pReq->taskId, vgId, (int32_t)hTaskId.taskId); } } // drop the stream task now - code = streamTaskSchedTask(cb, vgId, pReq->streamId, pReq->taskId, STREAM_EXEC_T_DROP_ONE_TASK); + code = streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId); if (code) { - tqError("s-task:0x%x vgId:%d failed to create msg to drop task, code:%s", pReq->taskId, vgId, tstrerror(code)); - } else { - tqDebug("s-task:0x%x vgId:%d create msg to drop succ", pReq->taskId, vgId); + tqDebug("s-task:0x%x vgId:%d drop task failed", pReq->taskId, vgId); } -// code = streamMetaUnregisterTask(pMeta, pReq->streamId, pReq->taskId); -// if (code) { -// tqDebug("s-task:0x%x vgId:%d drop task failed", pReq->taskId, vgId); -// } - // commit the update -// int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); -// tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks); + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + tqDebug("vgId:%d task:0x%x dropped, remain tasks:%d", vgId, pReq->taskId, numOfTasks); if (streamMetaCommit(pMeta) < 0) { // persist to disk } streamMetaWUnLock(pMeta); + tqDebug("vgId:%d process drop task:0x%x completed", vgId, pReq->taskId); - tqDebug("vgId:%d process drop task:0x%x async completed", vgId, pReq->taskId); return 0; // always return success } @@ -868,7 +859,7 @@ int32_t tqStreamTaskProcessRunReq(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLead } else if (type == STREAM_EXEC_T_ADD_FAILED_TASK) { code = streamMetaAddFailedTask(pMeta, req.streamId, req.taskId); return code; - } else if (type == STREAM_EXEC_T_DROP_ONE_TASK) { + } else if (type == STREAM_EXEC_T_STOP_ONE_TASK) { code = streamMetaDropTask(pMeta, req.streamId, req.taskId); return code; } else if (type == STREAM_EXEC_T_RESUME_TASK) { // task resume to run after idle for a while diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 3d60e4d797..f271679e37 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -982,24 +982,24 @@ int32_t qKillTask(qTaskInfo_t tinfo, int32_t rspCode, int64_t waitingDuration) { qDebug("%s sync killed execTask, and waiting for %.2fs", GET_TASKID(pTaskInfo), waitingDuration/1000.0); setTaskKilled(pTaskInfo, TSDB_CODE_TSC_QUERY_KILLED); - while (1) { - taosWLockLatch(&pTaskInfo->lock); - if (qTaskIsExecuting(pTaskInfo)) { // let's wait for 100 ms and try again - taosWUnLockLatch(&pTaskInfo->lock); - - taosMsleep(200); - - int64_t d = taosGetTimestampMs() - st; - if (d >= waitingDuration && waitingDuration >= 0) { - qWarn("%s waiting more than %.2fs, not wait anymore", GET_TASKID(pTaskInfo), waitingDuration/1000.0); - return TSDB_CODE_SUCCESS; - } - } else { // not running now - pTaskInfo->code = rspCode; - taosWUnLockLatch(&pTaskInfo->lock); +// while (1) { +// taosWLockLatch(&pTaskInfo->lock); +// if (qTaskIsExecuting(pTaskInfo)) { // let's wait for 100 ms and try again +// taosWUnLockLatch(&pTaskInfo->lock); +// +// taosMsleep(200); +// +// int64_t d = taosGetTimestampMs() - st; +// if (d >= waitingDuration && waitingDuration >= 0) { +// qWarn("%s waiting more than %.2fs, not wait anymore", GET_TASKID(pTaskInfo), waitingDuration/1000.0); +// return TSDB_CODE_SUCCESS; +// } +// } else { // not running now +// pTaskInfo->code = rspCode; +// taosWUnLockLatch(&pTaskInfo->lock); return TSDB_CODE_SUCCESS; - } - } +// } +// } } bool qTaskIsExecuting(qTaskInfo_t qinfo) { diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index d62165a72c..a43cdd0b85 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -622,15 +622,19 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV pReq->transId); streamMutexUnlock(&pTask->lock); - { // destroy the related fill-history tasks - // drop task should not in the meta-lock, and drop the related fill-history task now - if (pReq->dropRelHTask) { - code = streamTaskSchedTask(pTask->pMsgCb, vgId, pReq->hStreamId, pReq->hTaskId, STREAM_EXEC_T_DROP_ONE_TASK); - if (code) { - stError("s-task:%s failed to create msg to drop related fill-history task, code:%s", id, tstrerror(code)); - } - } - } + { // destroy the related fill-history tasks + // drop task should not in the meta-lock, and drop the related fill-history task now + if (pReq->dropRelHTask) { + code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d", + id, vgId, pReq->taskId, numOfTasks); + } + + if (pReq->dropRelHTask) { + code = streamMetaCommit(pMeta); + } + } // always return true return TSDB_CODE_SUCCESS; @@ -704,12 +708,10 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV // drop task should not in the meta-lock, and drop the related fill-history task now if (pReq->dropRelHTask) { - code = streamTaskSchedTask(pTask->pMsgCb, vgId, pReq->hStreamId, pReq->hTaskId, STREAM_EXEC_T_DROP_ONE_TASK); - if (code) { - stError("s-task:%s failed to create msg to drop related fill-history task, code:%s", id, tstrerror(code)); - } else { - stDebug("s-task:%s vgId:%d create msg to drop related fill-history task:0x%x", id, vgId, (int32_t)pReq->hTaskId); - } + code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, + (int32_t)pReq->hTaskId, numOfTasks); } code = streamMetaCommit(pMeta); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index ce1c228b7b..036cb43b12 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -965,7 +965,7 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t return 0; } -int32_t streamMetaDropTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { +int32_t streamMetaStopTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) { SStreamTask* pTask = NULL; int32_t code = 0; int32_t vgId = pMeta->vgId; @@ -973,18 +973,18 @@ int32_t streamMetaDropTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) streamMetaWLock(pMeta); - code = streamMetaUnregisterTask(pMeta, streamId, taskId); - numOfTasks = streamMetaGetNumOfTasks(pMeta); - if (code) { - stError("vgId:%d failed to drop task:0x%x, code:%s", vgId, taskId, tstrerror(code)); - } - - code = streamMetaCommit(pMeta); - if (code) { - stError("vgId:%d failed to commit after drop task:0x%x, code:%s", vgId, taskId, tstrerror(code)); - } else { - stDebug("s-task:0x%"PRIx64"-0x%x vgId:%d dropped, remain tasks:%d", streamId, taskId, pMeta->vgId, numOfTasks); - } +// code = streamMetaUnregisterTask(pMeta, streamId, taskId); +// numOfTasks = streamMetaGetNumOfTasks(pMeta); +// if (code) { +// stError("vgId:%d failed to drop task:0x%x, code:%s", vgId, taskId, tstrerror(code)); +// } +// +// code = streamMetaCommit(pMeta); +// if (code) { +// stError("vgId:%d failed to commit after drop task:0x%x, code:%s", vgId, taskId, tstrerror(code)); +// } else { +// stDebug("s-task:0x%"PRIx64"-0x%x vgId:%d dropped, remain tasks:%d", streamId, taskId, pMeta->vgId, numOfTasks); +// } streamMetaWUnLock(pMeta); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index f0b288f096..e4e8a37b37 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1287,8 +1287,8 @@ const char* streamTaskGetExecType(int32_t type) { return "resume-task-from-idle"; case STREAM_EXEC_T_ADD_FAILED_TASK: return "record-start-failed-task"; - case STREAM_EXEC_T_DROP_ONE_TASK: - return "drop-one-task"; + case STREAM_EXEC_T_STOP_ONE_TASK: + return "stop-one-task"; case 0: return "exec-all-tasks"; default: