diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 76f0d41a28..bcb479e71e 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -377,7 +377,7 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { if (pStreamTask->info.taskLevel == TASK_LEVEL__SOURCE) { // update the scan data range for source task. qDebug("s-task:%s level:%d stream task window %" PRId64 " - %" PRId64 " update to %" PRId64 " - %" PRId64 - ", status:%s, sched-status:%d", + ", status:%s, sched-status:%d", pStreamTask->id.idStr, TASK_LEVEL__SOURCE, pTimeWindow->skey, pTimeWindow->ekey, INT64_MIN, pTimeWindow->ekey, streamGetTaskStatusStr(TASK_STATUS__NORMAL), pStreamTask->status.schedStatus); } else { @@ -399,6 +399,61 @@ static int32_t streamTransferStateToStreamTask(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } +static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, + const char* id) { + int32_t retryTimes = 0; + int32_t MAX_RETRY_TIMES = 5; + + while (1) { + if (streamTaskShouldPause(&pTask->status)) { + qDebug("s-task:%s task should pause, input blocks:%d", pTask->id.idStr, *numOfBlocks); + return TSDB_CODE_SUCCESS; + } + + SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); + if (qItem == NULL) { + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) { + taosMsleep(10); + qDebug("===stream===try again batchSize:%d, retry:%d", *numOfBlocks, retryTimes); + continue; + } + + qDebug("===stream===break batchSize:%d", *numOfBlocks); + return TSDB_CODE_SUCCESS; + } + + // do not merge blocks for sink node + if (pTask->info.taskLevel == TASK_LEVEL__SINK) { + *numOfBlocks = 1; + *pInput = qItem; + return TSDB_CODE_SUCCESS; + } + + if (*pInput == NULL) { + ASSERT((*numOfBlocks) == 0); + *pInput = qItem; + } else { + // todo we need to sort the data block, instead of just appending into the array list. + void* newRet = streamMergeQueueItem(*pInput, qItem); + if (newRet == NULL) { + qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks); + streamQueueProcessFail(pTask->inputQueue); + return TSDB_CODE_SUCCESS; + } + + *pInput = newRet; + } + + *numOfBlocks += 1; + streamQueueProcessSuccess(pTask->inputQueue); + + if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) { + qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM); + return TSDB_CODE_SUCCESS; + } + } +} + /** * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * appropriate batch of blocks should be handled in 5 to 10 sec. @@ -407,73 +462,15 @@ int32_t streamExecForAll(SStreamTask* pTask) { const char* id = pTask->id.idStr; while (1) { - int32_t batchSize = 1; - int16_t times = 0; - + int32_t batchSize = 0; SStreamQueueItem* pInput = NULL; // merge multiple input data if possible in the input queue. - qDebug("s-task:%s start to extract data block from inputQ, status:%s", id, streamGetTaskStatusStr(pTask->status.taskStatus)); - - while (1) { - // downstream task's input queue is blocked, stop immediately - if (streamTaskShouldPause(&pTask->status) || (pTask->outputStatus == TASK_OUTPUT_STATUS__BLOCKED) || - streamTaskShouldStop(&pTask->status)) { - if (batchSize > 1) { - break; - } else { - qDebug("123 %s", pTask->id.idStr); - return 0; - } - } - - SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); - if (qItem == NULL) { - if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) { - times++; - taosMsleep(10); - qDebug("===stream===try again batchSize:%d", batchSize); - continue; - } - - qDebug("===stream===break batchSize:%d", batchSize); - break; - } - - if (pInput == NULL) { - pInput = qItem; - streamQueueProcessSuccess(pTask->inputQueue); - if (pTask->info.taskLevel == TASK_LEVEL__SINK) { - break; - } - } else { - // todo we need to sort the data block, instead of just appending into the array list. - void* newRet = NULL; - if ((newRet = streamMergeQueueItem(pInput, qItem)) == NULL) { - streamQueueProcessFail(pTask->inputQueue); - break; - } else { - batchSize++; - pInput = newRet; - streamQueueProcessSuccess(pTask->inputQueue); - - if (batchSize > MAX_STREAM_EXEC_BATCH_NUM) { - qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, - MAX_STREAM_EXEC_BATCH_NUM); - break; - } - } - } - } - - if (streamTaskShouldStop(&pTask->status)) { - if (pInput) { - streamFreeQitem(pInput); - } - return 0; - } + qDebug("s-task:%s start to extract data block from inputQ", id); + /*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id); if (pInput == NULL) { + ASSERT(batchSize == 0); if (pTask->info.fillHistory && pTask->status.transferState) { int32_t code = streamTransferStateToStreamTask(pTask); } @@ -532,8 +529,8 @@ int32_t streamExecForAll(SStreamTask* pTask) { streamTaskExecImpl(pTask, pInput, &resSize, &totalBlocks); double el = (taosGetTimestampMs() - st) / 1000.0; - qDebug("s-task:%s batch of (%d)input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", - id, batchSize, el, resSize / 1048576.0, totalBlocks); + qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", + id, el, resSize / 1048576.0, totalBlocks); streamFreeQitem(pInput); }