enh(stream): avoid clone submit block.

This commit is contained in:
Haojun Liao 2023-05-15 10:11:08 +08:00
parent ac287572ef
commit 3b6e052c6b
1 changed files with 1 additions and 13 deletions

View File

@ -307,15 +307,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
return -1; return -1;
} }
SStreamDataSubmit* pSubmitBlock = streamSubmitBlockClone((SStreamDataSubmit*)pItem); taosWriteQitem(pTask->inputQueue->queue, pItem);
if (pSubmitBlock == NULL) {
qDebug("task %d %p submit enqueue failed since out of memory", pTask->id.taskId, pTask);
terrno = TSDB_CODE_OUT_OF_MEMORY;
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__FAILED);
return -1;
}
taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock);
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || } else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
type == STREAM_INPUT__REF_DATA_BLOCK) { type == STREAM_INPUT__REF_DATA_BLOCK) {
int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1; int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1;
@ -340,10 +332,6 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE); atomic_val_compare_exchange_8(&pTask->triggerStatus, TASK_TRIGGER_STATUS__INACTIVE, TASK_TRIGGER_STATUS__ACTIVE);
} }
#if 0
atomic_store_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL);
#endif
return 0; return 0;
} }