refactor: update some logs.
This commit is contained in:
parent
c141abfbb8
commit
d026e7ef16
|
@ -305,7 +305,7 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
||||||
|
|
||||||
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) &&
|
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) &&
|
||||||
(numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) {
|
(numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) {
|
||||||
qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(size:%d, num:%.2fMiB) abort", pTask->id.idStr,
|
qError("s-task:%s input queue is full, capacity(size:%d num:%dMiB), current(blocks:%d, size:%.2fMiB) abort", pTask->id.idStr,
|
||||||
STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE,
|
STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE,
|
||||||
numOfBlocks, size);
|
numOfBlocks, size);
|
||||||
streamDataSubmitDestroy(pSubmitBlock);
|
streamDataSubmitDestroy(pSubmitBlock);
|
||||||
|
@ -316,10 +316,13 @@ int32_t tAppendDataToInputQueue(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
||||||
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
|
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
|
||||||
type == STREAM_INPUT__REF_DATA_BLOCK) {
|
type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||||
int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1;
|
int32_t numOfBlocks = taosQueueItemSize(pTask->inputQueue->queue) + 1;
|
||||||
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) && (numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY ||
|
double size = taosQueueMemorySize(pTask->inputQueue->queue) / 1048576.0;
|
||||||
(numOfBlocks >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) {
|
|
||||||
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, abort", pTask->id.idStr,
|
if ((pTask->taskLevel == TASK_LEVEL__SOURCE) &&
|
||||||
STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE);
|
(numOfBlocks > STREAM_TASK_INPUT_QUEUEU_CAPACITY || (size >= STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE))) {
|
||||||
|
qError("s-task:%s input queue is full, capacity:%d size:%d MiB, current(blocks:%d, size:%.2fMiB) abort",
|
||||||
|
pTask->id.idStr, STREAM_TASK_INPUT_QUEUEU_CAPACITY, STREAM_TASK_INPUT_QUEUEU_CAPACITY_IN_SIZE, numOfBlocks,
|
||||||
|
size);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue