diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 9a53ffa088..bfbaed7bf6 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -317,6 +317,60 @@ int32_t updateCheckPointInfo(SStreamTask* pTask) { return TSDB_CODE_SUCCESS; } +static int32_t extractMsgFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks, + const char* id) { + int32_t retryTimes = 0; + int32_t MAX_RETRY_TIMES = 5; + + while (1) { + if (streamTaskShouldPause(&pTask->status)) { + qDebug("s-task:%s task should pause, input blocks:%d", pTask->id.idStr, *numOfBlocks); + return TSDB_CODE_SUCCESS; + } + + SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); + if (qItem == NULL) { + if (pTask->taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) { + taosMsleep(10); + qDebug("===stream===try again batchSize:%d, retry:%d", *numOfBlocks, retryTimes); + continue; + } + + qDebug("===stream===break batchSize:%d", *numOfBlocks); + return TSDB_CODE_SUCCESS; + } + + // do not merge blocks for sink node + if (pTask->taskLevel == TASK_LEVEL__SINK) { + *numOfBlocks = 1; + *pInput = qItem; + return TSDB_CODE_SUCCESS; + } + + if (pInput == NULL) { + ASSERT((*numOfBlocks) == 0); + *pInput = qItem; + } else { + // todo we need to sort the data block, instead of just appending into the array list. + void* newRet = streamMergeQueueItem(*pInput, qItem); + if (newRet == NULL) { + qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks); + streamQueueProcessFail(pTask->inputQueue); + return TSDB_CODE_SUCCESS; + } + pInput = newRet; + } + + *numOfBlocks += 1; + streamQueueProcessSuccess(pTask->inputQueue); + + if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) { + qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM); + return TSDB_CODE_SUCCESS; + } + } +} + /** * todo: the batch of blocks should be tuned dynamic, according to the total elapsed time of each batch of blocks, the * appropriate batch of blocks should be handled in 5 to 10 sec. @@ -326,73 +380,12 @@ int32_t streamExecForAll(SStreamTask* pTask) { while (1) { int32_t batchSize = 0; - int16_t times = 0; - SStreamQueueItem* pInput = NULL; // merge multiple input data if possible in the input queue. qDebug("s-task:%s start to extract data block from inputQ", id); - while (1) { - if (streamTaskShouldPause(&pTask->status)) { - qDebug("s-task:%s task should pause, input blocks:%d", pTask->id.idStr, batchSize); - if (batchSize > 0) { - break; - } else { - return 0; - } - } - - SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); - if (qItem == NULL) { - if (pTask->taskLevel == TASK_LEVEL__SOURCE && batchSize < MIN_STREAM_EXEC_BATCH_NUM && times < 5) { - times++; - taosMsleep(10); - qDebug("===stream===try again batchSize:%d", batchSize); - continue; - } - - qDebug("===stream===break batchSize:%d", batchSize); - break; - } - - if (pInput == NULL) { - batchSize += 1; - - pInput = qItem; - streamQueueProcessSuccess(pTask->inputQueue); - if (pTask->taskLevel == TASK_LEVEL__SINK) { - break; - } - } else { - // todo we need to sort the data block, instead of just appending into the array list. - ASSERT(batchSize >= 1); - - void* newRet = streamMergeQueueItem(pInput, qItem); - if (newRet == NULL) { - streamQueueProcessFail(pTask->inputQueue); - break; - } else { - batchSize += 1; - - pInput = newRet; - streamQueueProcessSuccess(pTask->inputQueue); - - if (batchSize >= MAX_STREAM_EXEC_BATCH_NUM) { - qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM); - break; - } - } - } - } - -// if (streamTaskShouldStop(&pTask->status)) { -// if (pInput) { -// streamFreeQitem(pInput); -// } -// return 0; -// } - + /*int32_t code = */extractMsgFromInputQ(pTask, &pInput, &batchSize, id); if (pInput == NULL) { ASSERT(batchSize == 0); break; @@ -409,8 +402,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { while (pTask->taskLevel == TASK_LEVEL__SOURCE) { int8_t status = atomic_load_8(&pTask->status.taskStatus); if (status != TASK_STATUS__NORMAL && status != TASK_STATUS__PAUSE) { - qError("stream task wait for the end of fill history, s-task:%s, status:%d", id, - atomic_load_8(&pTask->status.taskStatus)); + qError("stream task wait for the end of fill history, s-task:%s, status:%d", id, status); taosMsleep(100); } else { break; @@ -463,6 +455,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { double el = (taosGetTimestampMs() - st) / 1000.0; qDebug("s-task:%s batch of input blocks exec end, elapsed time:%.2fs, result size:%.2fMiB, numOfBlocks:%d", id, el, resSize / 1048576.0, totalBlocks); + streamFreeQitem(pInput); }