Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/TD-19042
This commit is contained in:
commit
96b8431f19
|
@ -129,11 +129,11 @@ int32_t streamTaskEnqueueBlocks(SStreamTask* pTask, const SStreamDispatchReq* pR
|
|||
qDebug("vgId:%d, s-task:%s failed to receive dispatch msg, reason: out of memory", pTask->pMeta->vgId,
|
||||
pTask->id.idStr);
|
||||
} else {
|
||||
if (tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock) == 0) {
|
||||
status = TASK_INPUT_STATUS__NORMAL;
|
||||
} else { // input queue is full, upstream is blocked now
|
||||
status = TASK_INPUT_STATUS__BLOCKED;
|
||||
}
|
||||
int32_t code = tAppendDataToInputQueue(pTask, (SStreamQueueItem*)pBlock);
|
||||
// input queue is full, upstream is blocked now
|
||||
status = (code == TSDB_CODE_SUCCESS)? TASK_INPUT_STATUS__NORMAL:TASK_INPUT_STATUS__BLOCKED;
|
||||
|
||||
|
||||
}
|
||||
|
||||
// rsp by input status
|
||||
|
@ -303,6 +303,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
|||
taosFreeQitem(pItem);
|
||||
return -1;
|
||||
}
|
||||
|
||||
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
||||
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
|
||||
type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||
|
@ -310,6 +311,8 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
|||
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
|
||||
pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, total,
|
||||
size);
|
||||
destroyStreamDataBlock((SStreamDataBlock*) pItem);
|
||||
taosFreeQitem(pItem);
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue