fix(stream): fix the error when no agg tasks exist.
This commit is contained in:
parent
0d38f389ab
commit
a81cc9aebf
|
@ -509,24 +509,34 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
|
|||
}
|
||||
}
|
||||
|
||||
// transfer the ownership of executor state
|
||||
if (level == TASK_LEVEL__SOURCE) {
|
||||
qDebug("s-task:%s add transfer-state block into outputQ", id);
|
||||
} else {
|
||||
qDebug("s-task:%s all upstream tasks send transfer-state block, add transfer-state block into outputQ", id);
|
||||
ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1);
|
||||
}
|
||||
|
||||
// dispatch the tran-state block to downstream task immediately
|
||||
int32_t type = pTask->outputInfo.type;
|
||||
if ((level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) &&
|
||||
(type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH)) {
|
||||
pBlock->srcVgId = pTask->pMeta->vgId;
|
||||
code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock);
|
||||
if (code == 0) {
|
||||
streamDispatchStreamBlock(pTask);
|
||||
|
||||
// transfer the ownership of executor state
|
||||
if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
|
||||
if (level == TASK_LEVEL__SOURCE) {
|
||||
qDebug("s-task:%s add transfer-state block into outputQ", id);
|
||||
} else {
|
||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||
qDebug("s-task:%s all upstream tasks send transfer-state block, add transfer-state block into outputQ", id);
|
||||
ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1);
|
||||
}
|
||||
|
||||
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SOURCE) {
|
||||
pBlock->srcVgId = pTask->pMeta->vgId;
|
||||
code = taosWriteQitem(pTask->outputInfo.queue->queue, pBlock);
|
||||
if (code == 0) {
|
||||
streamDispatchStreamBlock(pTask);
|
||||
} else {
|
||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||
}
|
||||
}
|
||||
} else { // non-dispatch task, do task state transfer directly
|
||||
qDebug("s-task:%s non-dispatch task, start to transfer state directly", id);
|
||||
ASSERT(pTask->info.fillHistory == 1);
|
||||
code = streamTransferStateToStreamTask(pTask);
|
||||
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
atomic_store_8(&pTask->status.schedStatus, TASK_SCHED_STATUS__INACTIVE);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue