From 37bf387453f1b03e05c48c7e4ace0a3a0f8b3e76 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 27 Sep 2023 09:36:20 +0800 Subject: [PATCH] fix(stream): reduce the counter when remove stream tasks. --- source/libs/stream/src/streamDispatch.c | 7 ++++--- source/libs/stream/src/streamExec.c | 4 ++-- source/libs/stream/src/streamMeta.c | 2 ++ 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/source/libs/stream/src/streamDispatch.c b/source/libs/stream/src/streamDispatch.c index bd5753cac3..ea8c7c71ac 100644 --- a/source/libs/stream/src/streamDispatch.c +++ b/source/libs/stream/src/streamDispatch.c @@ -1043,12 +1043,14 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId pTask->msgInfo.pData = NULL; int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs; - stDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms", - pTask->id.idStr, downstreamId, el); // put data into inputQ of current task is also allowed if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) { pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; + stDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms", + pTask->id.idStr, downstreamId, el); + } else { + stDebug("s-task:%s dispatch completed, elapsed time:%"PRId64"ms", pTask->id.idStr, el); } // now ready for next data output @@ -1110,7 +1112,6 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i int32_t leftRsp = 0; if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - stDebug("s-task:%s waiting rsp:%d", id, pTask->shuffleDispatcher.waitingRspCnt); leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1); ASSERT(leftRsp >= 0); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index a6f7ac27d4..20c1495d49 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -248,8 +248,8 @@ int32_t streamScanHistoryData(SStreamTask* pTask) { size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block); if ((++numOfBlocks) >= outputBatchSize || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) { - stDebug("s-task:%s scan exec numOfBlocks:%d, output num-limit:%d, size-limit:%d reached", pTask->id.idStr, numOfBlocks, - outputBatchSize, STREAM_RESULT_DUMP_SIZE_THRESHOLD); + stDebug("s-task:%s scan exec numOfBlocks:%d, size:%d output num-limit:%d, size-limit:%d reached", + pTask->id.idStr, numOfBlocks, size, outputBatchSize, STREAM_RESULT_DUMP_SIZE_THRESHOLD); break; } } diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 2c96a26ddc..8ae349a31a 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -549,6 +549,8 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t (*ppStreamTask)->historyTaskId.taskId = 0; (*ppStreamTask)->historyTaskId.streamId = 0; } + } else { + atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1); } taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));