diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 8bca0e6ec2..72fd520498 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -237,7 +237,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { int32_t leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); - qDebug("task %d is shuffle, left waiting rsp %d", pTask->id.taskId, leftRsp); + qDebug("s-task:%s is shuffle, left waiting rsp %d", pTask->id.idStr, leftRsp); if (leftRsp > 0) { return 0; } @@ -246,6 +246,8 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i int8_t old = atomic_exchange_8(&pTask->outputStatus, pRsp->inputStatus); ASSERT(old == TASK_OUTPUT_STATUS__WAIT); + qDebug("s-task:%s receive dispatch rsp, output status:%d", pTask->id.idStr, pTask->outputStatus); + // the input queue of the (down stream) task that receive the output data is full, so the TASK_INPUT_STATUS_BLOCKED is rsp // todo we need to send EMPTY PACKAGE to detect if the input queue is available for output of upstream task, every 50 ms. if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) { diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index 401a8b9e74..1e939cb071 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -510,14 +510,17 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { int8_t old = atomic_val_compare_exchange_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL, TASK_OUTPUT_STATUS__WAIT); if (old != TASK_OUTPUT_STATUS__NORMAL) { - qDebug("s-task:%s task wait for dispatch rsp, not dispatch now", pTask->id.idStr); + qDebug("s-task:%s task wait for dispatch rsp, not dispatch now, output status:%d", pTask->id.idStr, old); return 0; } + qDebug("s-task:%s start to dispatch msg, output status:%d", pTask->id.idStr, pTask->outputStatus); + SStreamDataBlock* pDispatchedBlock = streamQueueNextItem(pTask->outputQueue); if (pDispatchedBlock == NULL) { - qDebug("s-task:%s stop dispatching since no output in output queue", pTask->id.idStr); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); + qDebug("s-task:%s stop dispatching since no output in output queue, output status:%d", pTask->id.idStr, + pTask->outputStatus); return 0; } @@ -527,6 +530,7 @@ int32_t streamDispatchStreamBlock(SStreamTask* pTask) { if (code != TSDB_CODE_SUCCESS) { streamQueueProcessFail(pTask->outputQueue); atomic_store_8(&pTask->outputStatus, TASK_OUTPUT_STATUS__NORMAL); + qDebug("s-task:%s failed to dispatch msg to downstream, output status:%d", pTask->id.idStr, pTask->outputStatus); } // this block can be freed only when it has been pushed to down stream.