From 289ec6cf666bf9159736fa0edbaafc5ee589a5f7 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 17 Apr 2024 17:57:12 +0800 Subject: [PATCH] fix(stream): reset checkpoint info after receiving task-reset info. --- source/dnode/vnode/src/tqCommon/tqCommon.c | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 1c3a760bab..f2b90606d9 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -923,13 +923,20 @@ int32_t tqStreamTaskProcessTaskResetReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { tqDebug("s-task:%s receive task-reset msg from mnode, reset status and ready for data processing", pTask->id.idStr); taosThreadMutexLock(&pTask->lock); + streamTaskClearCheckInfo(pTask, true); // clear flag set during do checkpoint, and open inputQ for all upstream tasks - if (streamTaskGetStatus(pTask)->state == TASK_STATUS__CK) { + SStreamTaskState *pState = streamTaskGetStatus(pTask); + if (pState->state == TASK_STATUS__CK) { tqDebug("s-task:%s reset task status from checkpoint, current checkpointingId:%" PRId64 ", transId:%d", pTask->id.idStr, pTask->chkInfo.checkpointingId, pTask->chkInfo.transId); - streamTaskClearCheckInfo(pTask, true); streamTaskSetStatusReady(pTask); + } else if (pState->state == TASK_STATUS__UNINIT) { + tqDebug("s-task:%s start task by checking downstream tasks", pTask->id.idStr); + ASSERT(pTask->status.downstreamReady == 0); + /*int32_t ret = */ streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT); + } else { + tqDebug("s-task:%s status:%s do nothing after receiving reset-task from mnode", pTask->id.idStr, pState->name); } taosThreadMutexUnlock(&pTask->lock);