diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 756183a624..a177c56f77 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -305,6 +305,17 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin " no need to update the checkpoint info, updated checkpointId:%" PRId64 " checkpointVer:%" PRId64 " ignored", id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer); taosThreadMutexUnlock(&pTask->lock); + + { // destroy the related fill-history tasks + // drop task should not in the meta-lock, and drop the related fill-history task now + if (pReq->dropRelHTask) { + streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); + int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); + stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d", + id, vgId, pReq->taskId, numOfTasks); + } + } + return TSDB_CODE_SUCCESS; }