use stream task
This commit is contained in:
parent
c81382dcb9
commit
9cb481dd6e
|
@ -364,14 +364,15 @@ static int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
|
||||||
|
|
||||||
// 7. pause allowed.
|
// 7. pause allowed.
|
||||||
streamTaskEnablePause(pStreamTask);
|
streamTaskEnablePause(pStreamTask);
|
||||||
if (taosQueueEmpty(pTask->inputQueue->queue)) {
|
if (taosQueueEmpty(pStreamTask->inputQueue->queue)) {
|
||||||
SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);;
|
SStreamRefDataBlock* pItem = taosAllocateQitem(sizeof(SStreamRefDataBlock), DEF_QITEM, 0);;
|
||||||
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
|
SSDataBlock* pDelBlock = createSpecialDataBlock(STREAM_DELETE_DATA);
|
||||||
pDelBlock->info.rows = 0;
|
pDelBlock->info.rows = 0;
|
||||||
pDelBlock->info.version = 0;
|
pDelBlock->info.version = 0;
|
||||||
pItem->type = STREAM_INPUT__REF_DATA_BLOCK;
|
pItem->type = STREAM_INPUT__REF_DATA_BLOCK;
|
||||||
pItem->pBlock = pDelBlock;
|
pItem->pBlock = pDelBlock;
|
||||||
tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pItem);
|
int32_t code = tAppendDataToInputQueue(pStreamTask, (SStreamQueueItem*)pItem);
|
||||||
|
qDebug("s-task:%s append dummy delete block,res:%d", pStreamTask->id.idStr, code);
|
||||||
}
|
}
|
||||||
|
|
||||||
streamSchedExec(pStreamTask);
|
streamSchedExec(pStreamTask);
|
||||||
|
|
Loading…
Reference in New Issue