From 192c47a7c257cccdc17f27db19cc86f4094d5ac0 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 22 Sep 2023 17:27:10 +0800 Subject: [PATCH] fix(stream): add some logs. --- source/libs/stream/src/streamDispatch.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index bf9c0d44db..fe72ff05e6 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1061,7 +1061,11 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i int32_t leftRsp = 0; if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + stDebug("s-task:%s waiting rsp:%d", id, pTask->shuffleDispatcher.waitingRspCnt); + leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); + ASSERT(leftRsp >= 0); + if (leftRsp > 0) { stDebug( "s-task:%s recv dispatch rsp, msgId:%d from 0x%x(vgId:%d), downstream task input status:%d code:%d, waiting for %d rsp", id, msgId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->inputStatus, code, leftRsp); @@ -1131,6 +1135,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i if (numOfFailed > 0) { if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { atomic_store_32(&pTask->shuffleDispatcher.waitingRspCnt, numOfFailed); + stDebug("s-task:%s waiting rsp set to be %d", id, pTask->shuffleDispatcher.waitingRspCnt); } int8_t ref = atomic_add_fetch_8(&pTask->status.timerActive, 1);