refactor: always successfully put the retrieve msg even though the input queue is full already.
This commit is contained in:
parent
26334df090
commit
044f6fbcca
|
@ -304,8 +304,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
||||||
// use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
|
// use the local variable to avoid the pItem be freed by other threads, since it has been put into queue already.
|
||||||
stDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
stDebug("s-task:%s submit enqueue msgLen:%d ver:%" PRId64 ", total in queue:%d, size:%.2fMiB", pTask->id.idStr,
|
||||||
msgLen, ver, total, size + SIZE_IN_MiB(msgLen));
|
msgLen, ver, total, size + SIZE_IN_MiB(msgLen));
|
||||||
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
|
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||||
type == STREAM_INPUT__REF_DATA_BLOCK) {
|
|
||||||
if (streamQueueIsFull(pTask->inputq.queue)) {
|
if (streamQueueIsFull(pTask->inputq.queue)) {
|
||||||
double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
|
double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
|
||||||
|
|
||||||
|
@ -324,7 +323,7 @@ int32_t streamTaskPutDataIntoInputQ(SStreamTask* pTask, SStreamQueueItem* pItem)
|
||||||
double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
|
double size = SIZE_IN_MiB(taosQueueMemorySize(pQueue));
|
||||||
stDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
|
stDebug("s-task:%s blockdata enqueue, total in queue:%d, size:%.2fMiB", pTask->id.idStr, total, size);
|
||||||
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
|
} else if (type == STREAM_INPUT__CHECKPOINT || type == STREAM_INPUT__CHECKPOINT_TRIGGER ||
|
||||||
type == STREAM_INPUT__TRANS_STATE) {
|
type == STREAM_INPUT__TRANS_STATE || type == STREAM_INPUT__DATA_RETRIEVE) {
|
||||||
int32_t code = taosWriteQitem(pQueue, pItem);
|
int32_t code = taosWriteQitem(pQueue, pItem);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
streamFreeQitem(pItem);
|
streamFreeQitem(pItem);
|
||||||
|
|
Loading…
Reference in New Issue