fix(stream): handle block before checkpoint msg in the inputQ.
This commit is contained in:
parent
dea2e73d2e
commit
8c8b1a6d79
|
@ -168,10 +168,37 @@ SStreamQueueItem* doReadMultiBlocksFromQueue(SQueueReader* pReader, const char*
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks) {
|
int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks) {
|
||||||
int32_t retryTimes = 0;
|
int32_t retryTimes = 0;
|
||||||
int32_t MAX_RETRY_TIMES = 5;
|
int32_t MAX_RETRY_TIMES = 5;
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
|
|
||||||
|
if (pTask->info.taskLevel == TASK_LEVEL__SINK) { // extract block from inputQ, one-by-one
|
||||||
|
while (1) {
|
||||||
|
if (streamTaskShouldPause(&pTask->status)) {
|
||||||
|
qDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue);
|
||||||
|
if (qItem == NULL) {
|
||||||
|
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE && (++retryTimes) < MAX_RETRY_TIMES) {
|
||||||
|
taosMsleep(10);
|
||||||
|
qDebug("===stream===try again batchSize:%d, retry:%d, %s", *numOfBlocks, retryTimes, id);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug("===stream===break batchSize:%d, %s", *numOfBlocks, id);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
|
qDebug("s-task:%s sink task handle result block one-by-one", id);
|
||||||
|
*numOfBlocks = 1;
|
||||||
|
*pInput = qItem;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// non sink task
|
||||||
while (1) {
|
while (1) {
|
||||||
if (streamTaskShouldPause(&pTask->status)) {
|
if (streamTaskShouldPause(&pTask->status)) {
|
||||||
qDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks);
|
qDebug("s-task:%s task should pause, extract input blocks:%d", pTask->id.idStr, *numOfBlocks);
|
||||||
|
@ -191,41 +218,41 @@ int32_t extractBlocksFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, i
|
||||||
}
|
}
|
||||||
|
|
||||||
// do not merge blocks for sink node and check point data block
|
// do not merge blocks for sink node and check point data block
|
||||||
if ((pTask->info.taskLevel == TASK_LEVEL__SINK) ||
|
if ((qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER)) {
|
||||||
(qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER)) {
|
if (*pInput == NULL) {
|
||||||
|
|
||||||
if (qItem->type == STREAM_INPUT__CHECKPOINT || qItem->type == STREAM_INPUT__CHECKPOINT_TRIGGER) {
|
|
||||||
qDebug("s-task:%s checkpoint msg extracted, start to process immediately", id);
|
qDebug("s-task:%s checkpoint msg extracted, start to process immediately", id);
|
||||||
|
*numOfBlocks = 1;
|
||||||
|
*pInput = qItem;
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
} else {
|
} else {
|
||||||
qDebug("s-task:%s sink task handle result block one-by-one", id);
|
// previous existed blocks needs to be handle, before handle the checkpoint msg block
|
||||||
}
|
qDebug("s-task:%s checkpoint msg extracted, handle previous block first, numOfBlocks:%d", id, *numOfBlocks);
|
||||||
|
|
||||||
*numOfBlocks = 1;
|
|
||||||
*pInput = qItem;
|
|
||||||
return TSDB_CODE_SUCCESS;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (*pInput == NULL) {
|
|
||||||
ASSERT((*numOfBlocks) == 0);
|
|
||||||
*pInput = qItem;
|
|
||||||
} else {
|
|
||||||
// todo we need to sort the data block, instead of just appending into the array list.
|
|
||||||
void* newRet = streamMergeQueueItem(*pInput, qItem);
|
|
||||||
if (newRet == NULL) {
|
|
||||||
qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks);
|
|
||||||
streamQueueProcessFail(pTask->inputQueue);
|
streamQueueProcessFail(pTask->inputQueue);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
if (*pInput == NULL) {
|
||||||
|
ASSERT((*numOfBlocks) == 0);
|
||||||
|
*pInput = qItem;
|
||||||
|
} else {
|
||||||
|
// todo we need to sort the data block, instead of just appending into the array list.
|
||||||
|
void* newRet = streamMergeQueueItem(*pInput, qItem);
|
||||||
|
if (newRet == NULL) {
|
||||||
|
qError("s-task:%s failed to merge blocks from inputQ, numOfBlocks:%d", id, *numOfBlocks);
|
||||||
|
streamQueueProcessFail(pTask->inputQueue);
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
*pInput = newRet;
|
*pInput = newRet;
|
||||||
}
|
}
|
||||||
|
|
||||||
*numOfBlocks += 1;
|
*numOfBlocks += 1;
|
||||||
streamQueueProcessSuccess(pTask->inputQueue);
|
streamQueueProcessSuccess(pTask->inputQueue);
|
||||||
|
|
||||||
if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
|
if (*numOfBlocks >= MAX_STREAM_EXEC_BATCH_NUM) {
|
||||||
qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
|
qDebug("s-task:%s batch size limit:%d reached, start to process blocks", id, MAX_STREAM_EXEC_BATCH_NUM);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -665,13 +665,13 @@ void streamPrepareNdoCheckDownstream(SStreamTask* pTask) {
|
||||||
pRange->range.minVer = 0;
|
pRange->range.minVer = 0;
|
||||||
pRange->range.maxVer = ver;
|
pRange->range.maxVer = ver;
|
||||||
|
|
||||||
qDebug("s-task:%s level:%d fill-history task exists, update stream time window:%" PRId64 " - %" PRId64
|
qDebug("s-task:%s level:%d related fill-history task exists, update stream time window:%" PRId64 " - %" PRId64
|
||||||
", ver range:%" PRId64 " - %" PRId64,
|
", ver range:%" PRId64 " - %" PRId64,
|
||||||
pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
|
pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
|
||||||
pRange->range.maxVer);
|
pRange->range.maxVer);
|
||||||
} else {
|
} else {
|
||||||
SHistDataRange* pRange = &pTask->dataRange;
|
SHistDataRange* pRange = &pTask->dataRange;
|
||||||
qDebug("s-task:%s no associated scan-history task, stream time window:%" PRId64 " - %" PRId64
|
qDebug("s-task:%s no related fill-history task, stream time window:%" PRId64 " - %" PRId64
|
||||||
", ver range:%" PRId64 " - %" PRId64,
|
", ver range:%" PRId64 " - %" PRId64,
|
||||||
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
|
pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue