diff --git a/source/libs/stream/src/streamQueue.c b/source/libs/stream/src/streamQueue.c index 74709b29e8..b3dbf827a5 100644 --- a/source/libs/stream/src/streamQueue.c +++ b/source/libs/stream/src/streamQueue.c @@ -168,10 +168,37 @@ SStreamQueueItem* doReadMultiBlocksFromQueue(SQueueReader* pReader, const char* #endif int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks) { - int32_t retryTimes = 0; - int32_t MAX_RETRY_TIMES = 5; + int32_t retryTimes = 0; + int32_t MAX_RETRY_TIMES = 5; const char* id = pTask->id.idStr; + if (pTask->info.taskLevel == TASK_LEVEL__SINK) { // extract block from inputQ, one-by-one + while (1) { + if (streamTaskShouldPause(&pTask->status)) { + qDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks); + return TSDB_CODE_SUCCESS; + } + + SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); + if (qItem == NULL) { + if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) { + taosMsleep(10); + qDebug("===stream===try again batchSize:%d, retry:%d, %s", *numOfBlocks, retryTimes, id); + continue; + } + + qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id); + return TSDB_CODE_SUCCESS; + } + + qDebug("s-task:%s sink task handle result block one-by-one", id); + *numOfBlocks = 1; + *pInput = qItem; + return TSDB_CODE_SUCCESS; + } + } + + // non sink task while (1) { if (streamTaskShouldPause(&pTask->status)) { qDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks); @@ -191,41 +218,41 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i } // do not merge blocks for sink node and check point data block - if ((pTask->info.taskLevel == TASK_LEVEL__SINK) || - (qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER)) { - - if (qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER) { + if ((qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER)) { + if (*pInput == NULL) { qDebug("s-task:%s checkpoint msg extracted, start to process immediately", id); + *numOfBlocks = 1; + *pInput = qItem; + return TSDB_CODE_SUCCESS; } else { - qDebug("s-task:%s sink task handle result block one-by-one", id); - } - - *numOfBlocks = 1; - *pInput = qItem; - return TSDB_CODE_SUCCESS; - } - - if (*pInput == NULL) { - ASSERT((*numOfBlocks) == 0); - *pInput = qItem; - } else { - // todo we need to sort the data block, instead of just appending into the array list. - void* newRet = streamMergeQueueItem(*pInput, qItem); - if (newRet == NULL) { - qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks); + // previous existed blocks needs to be handle, before handle the checkpoint msg block + qDebug("s-task:%s checkpoint msg extracted, handle previous block first, numOfBlocks:%d", id, *numOfBlocks); streamQueueProcessFail(pTask->inputQueue); return TSDB_CODE_SUCCESS; } + } else { + if (*pInput == NULL) { + ASSERT((*numOfBlocks) == 0); + *pInput = qItem; + } else { + // todo we need to sort the data block, instead of just appending into the array list. + void* newRet = streamMergeQueueItem(*pInput, qItem); + if (newRet == NULL) { + qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks); + streamQueueProcessFail(pTask->inputQueue); + return TSDB_CODE_SUCCESS; + } - *pInput = newRet; - } + *pInput = newRet; + } - *numOfBlocks += 1; - streamQueueProcessSuccess(pTask->inputQueue); + *numOfBlocks += 1; + streamQueueProcessSuccess(pTask->inputQueue); - if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) { - qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM); - return TSDB_CODE_SUCCESS; + if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) { + qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM); + return TSDB_CODE_SUCCESS; + } } } } diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 1446198b16..e5994d718b 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -665,13 +665,13 @@ void streamPrepareNdoCheckDownstream(SStreamTask* pTask) { pRange->range.minVer = 0; pRange->range.maxVer = ver; - qDebug("s-task:%s level:%d fill-history task exists, update stream time window:%" PRId64 " - %" PRId64 + qDebug("s-task:%s level:%d related fill-history task exists, update stream time window:%" PRId64 " - %" PRId64 ", ver range:%" PRId64 " - %" PRId64, pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); } else { SHistDataRange* pRange = &pTask->dataRange; - qDebug("s-task:%s no associated scan-history task, stream time window:%" PRId64 " - %" PRId64 + qDebug("s-task:%s no related fill-history task, stream time window:%" PRId64 " - %" PRId64 ", ver range:%" PRId64 " - %" PRId64, pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer); }