diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 6047f74ab2..a0a7d5caf7 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -307,15 +307,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { return -1; } - SStreamDataSubmit* pSubmitBlock = streamSubmitBlockClone((SStreamDataSubmit*)pItem); - if (pSubmitBlock == NULL) { - qDebug("task %d %p submit enqueue failed since out of memory", pTask->id.taskId, pTask); - terrno = TSDB_CODE_OUT_OF_MEMORY; - atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); - return -1; - } - - taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock); + taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; @@ -340,10 +332,6 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE); } -#if 0 - atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL); -#endif - return 0; }