From 3b6e052c6b1d895113eed54e064cb99479703248 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 15 May 2023 10:11:08 +0800 Subject: [PATCH] enh(stream): avoid clone submit block. --- source/libs/stream/src/stream.c | 14 +------------- 1 file changed, 1 insertion(+), 13 deletions(-) diff --git a/source/libs/stream/src/stream.c b/source/libs/stream/src/stream.c index 6047f74ab2..a0a7d5caf7 100644 --- a/source/libs/stream/src/stream.c +++ b/source/libs/stream/src/stream.c @@ -307,15 +307,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { return -1; } - SStreamDataSubmit* pSubmitBlock = streamSubmitBlockClone((SStreamDataSubmit*)pItem); - if (pSubmitBlock == NULL) { - qDebug("task %d %p submit enqueue failed since out of memory", pTask->id.taskId, pTask); - terrno = TSDB_CODE_OUT_OF_MEMORY; - atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED); - return -1; - } - - taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock); + taosWriteQitem(pTask->inputQueue->queue, pItem); } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__REF_DATA_BLOCK) { int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; @@ -340,10 +332,6 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) { atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE); } -#if 0 - atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL); -#endif - return 0; }