From db4a00c74ed4ac0f58e360af579ce8ec5580884f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 5 Jul 2024 08:55:40 +0800 Subject: [PATCH] fix(stream): not restart for reset task status. --- source/dnode/mnode/impl/src/mndStream.c | 8 ++++---- source/dnode/vnode/src/tqCommon/tqCommon.c | 8 ++++---- source/libs/stream/src/streamCheckStatus.c | 3 ++- 3 files changed, 10 insertions(+), 9 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 536dfab331..f5b944de45 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -2701,8 +2701,8 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg) { } } - mInfo("vgId:%d meta-stored checkpointId for stream:0x%" PRIx64 " %s is:%" PRId64, req.nodeId, req.streamId, - pStream->name, pStream->checkpointId); + mInfo("vgId:%d stream:0x%" PRIx64 " %s meta-stored checkpointId:%" PRId64 " stream:0x%" PRIx64 " %s", req.nodeId, + req.streamId, pStream->name, pStream->checkpointId); int32_t numOfTasks = (pStream == NULL) ? 0 : mndGetNumOfStreamTasks(pStream); if ((pStream != NULL) && (pStream->checkpointId == 0)) { // not generated checkpoint yet, return 0 directly @@ -2730,8 +2730,8 @@ static int32_t mndProcessConsensusCheckpointId(SRpcMsg *pMsg) { } if (chkId == req.checkpointId) { - mDebug("vgId:%d stream:0x%" PRIx64 " %s consensus-checkpointId is:%" PRId64, req.nodeId, req.streamId, - pStream->name, pStream->checkpointId); + mDebug("vgId:%d stream:0x%" PRIx64 " %s consensus-checkpointId is:%" PRId64 ", meta-stored checkpointId:%" PRId64, + req.nodeId, req.streamId, pStream->name, chkId, pStream->checkpointId); mndSendQuickConsensusChkptIdRsp(&req, TSDB_CODE_SUCCESS, req.streamId, chkId, &pMsg->info); taosThreadMutexUnlock(&execInfo.lock); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 1999134754..c40332ff39 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -871,7 +871,6 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr); taosThreadMutexLock(&pTask->lock); - streamTaskClearCheckInfo(pTask, true); // clear flag set during do checkpoint, and open inputQ for all upstream tasks @@ -886,9 +885,10 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { streamTaskSetStatusReady(pTask); } else if (pState->state == TASK_STATUS__UNINIT) { - tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr); - ASSERT(pTask->status.downstreamReady == 0); - tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId); +// tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr); +// ASSERT(pTask->status.downstreamReady == 0); +// tqStreamTaskRestoreCheckpoint(pMeta, pTask->id.streamId, pTask->id.taskId); + tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState->name); } else { tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState->name); } diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 22d336a549..2d8fe4a0e1 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -668,6 +668,7 @@ void rspMonitorFn(void* param, void* tmrId) { streamTaskCompleteCheckRsp(pInfo, true, id); // not record the failed of the current task if try to close current vnode + // otherwise, the put of message operation may incur invalid read of message queue. if (!pMeta->closeFlag) { addDownstreamFailedStatusResultAsync(pTask->pMsgCb, vgId, pTask->id.streamId, pTask->id.taskId); } @@ -676,7 +677,7 @@ void rspMonitorFn(void* param, void* tmrId) { return; } - if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY || state == TASK_STATUS__PAUSE) { + if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);