fix(stream): reduce the counter when remove stream tasks.

This commit is contained in:
Haojun Liao 2023-09-27 09:36:20 +08:00
parent ecb3b44026
commit f2848de12a
3 changed files with 8 additions and 5 deletions

View File

@ -1043,12 +1043,14 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
pTask->msgInfo.pData = NULL;
int64_t el = taosGetTimestampMs() - pTask->msgInfo.startTs;
stDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms",
pTask->id.idStr, downstreamId, el);
// put data into inputQ of current task is also allowed
if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) {
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
stDebug("s-task:%s downstream task:0x%x resume to normal from inputQ blocking, blocking time:%" PRId64 "ms",
pTask->id.idStr, downstreamId, el);
} else {
stDebug("s-task:%s dispatch completed, elapsed time:%"PRId64"ms", pTask->id.idStr, el);
}
// now ready for next data output
@ -1110,7 +1112,6 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
int32_t leftRsp = 0;
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
stDebug("s-task:%s waiting rsp:%d", id, pTask->shuffleDispatcher.waitingRspCnt);
leftRsp = atomic_sub_fetch_32(&pTask->shuffleDispatcher.waitingRspCnt, 1);
ASSERT(leftRsp >= 0);

View File

@ -248,8 +248,8 @@ int32_t streamScanHistoryData(SStreamTask* pTask) {
size += blockDataGetSize(output) + sizeof(SSDataBlock) + sizeof(SColumnInfoData) * blockDataGetNumOfCols(&block);
if ((++numOfBlocks) >= outputBatchSize || size >= STREAM_RESULT_DUMP_SIZE_THRESHOLD) {
stDebug("s-task:%s scan exec numOfBlocks:%d, output num-limit:%d, size-limit:%d reached", pTask->id.idStr, numOfBlocks,
outputBatchSize, STREAM_RESULT_DUMP_SIZE_THRESHOLD);
stDebug("s-task:%s scan exec numOfBlocks:%d, size:%d output num-limit:%d, size-limit:%d reached",
pTask->id.idStr, numOfBlocks, size, outputBatchSize, STREAM_RESULT_DUMP_SIZE_THRESHOLD);
break;
}
}

View File

@ -550,6 +550,8 @@ int32_t streamMetaUnregisterTask(SStreamMeta* pMeta, int64_t streamId, int32_t t
(*ppStreamTask)->historyTaskId.taskId = 0;
(*ppStreamTask)->historyTaskId.streamId = 0;
}
} else {
atomic_sub_fetch_32(&pMeta->numOfStreamTasks, 1);
}
taosHashRemove(pMeta->pTasksMap, &id, sizeof(id));