diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 387df52f16..fb8a96124d 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3472,6 +3472,8 @@ typedef struct SVUpdateCheckpointInfoReq { int64_t checkpointVer; int64_t checkpointTs; int32_t transId; + int64_t hStreamId; // add encode/decode + int64_t hTaskId; int8_t dropRelHTask; } SVUpdateCheckpointInfoReq; diff --git a/source/dnode/mnode/impl/src/mndStreamUtil.c b/source/dnode/mnode/impl/src/mndStreamUtil.c index a0731833e6..e47f28c309 100644 --- a/source/dnode/mnode/impl/src/mndStreamUtil.c +++ b/source/dnode/mnode/impl/src/mndStreamUtil.c @@ -716,6 +716,8 @@ static int32_t doSetUpdateChkptAction(SMnode *pMnode, STrans *pTrans, SStreamTas pReq->checkpointTs = pInfo->ts; pReq->dropRelHTask = pInfo->dropHTask; pReq->transId = pInfo->transId; + pReq->hStreamId = pTask->hTaskInfo.id.streamId; + pReq->hTaskId = pTask->hTaskInfo.id.taskId; } } diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index cabccfc0c8..d16c41ec1e 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -646,8 +646,6 @@ int32_t tqStreamTaskProcessDropReq(SStreamMeta* pMeta, char* msg, int32_t msgLen } streamMetaWUnLock(pMeta); - -// tqStreamRemoveTaskBackend(pMeta, &id); return 0; } diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 6696c9f8c2..eedd8f20d6 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -418,7 +418,6 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin int32_t code = 0; const char* id = pTask->id.idStr; SCheckpointInfo* pInfo = &pTask->chkInfo; - STaskId hTaskId = {0}; taosThreadMutexLock(&pTask->lock); @@ -434,7 +433,7 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin // drop task should not in the meta-lock, and drop the related fill-history task now streamMetaWUnLock(pMeta); if (pReq->dropRelHTask) { - streamMetaUnregisterTask(pMeta, pTask->hTaskInfo.id.streamId, pTask->hTaskInfo.id.taskId); + streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d", id, vgId, pReq->taskId, numOfTasks); @@ -476,9 +475,8 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin } if (pReq->dropRelHTask) { - hTaskId = pTask->hTaskInfo.id; stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint", - pReq->taskId, vgId, hTaskId.taskId); + pReq->taskId, vgId, pReq->hTaskId); CLEAR_RELATED_FILLHISTORY_TASK(pTask); } @@ -499,10 +497,10 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, SVUpdateCheckpoin // drop task should not in the meta-lock, and drop the related fill-history task now if (pReq->dropRelHTask) { - streamMetaUnregisterTask(pMeta, hTaskId.streamId, hTaskId.taskId); + streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId); int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta); stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId, - (int32_t)hTaskId.taskId, numOfTasks); + (int32_t)pReq->hTaskId, numOfTasks); } streamMetaWLock(pMeta);