fix(tmq):fix the invalid read.
This commit is contained in:
parent
c91d8a5901
commit
bd4c33d1c2
|
@ -287,12 +287,12 @@ int32_t tAppendDataForStream(SStreamTask* pTask, SStreamQueueItem* pItem) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock);
|
int32_t total = taosQueueItemSize(pTask->inputQueue->queue) + 1;
|
||||||
|
|
||||||
int32_t total = taosQueueItemSize(pTask->inputQueue->queue);
|
|
||||||
qDebug("stream task:%d %p submit enqueue %p %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->taskId,
|
qDebug("stream task:%d %p submit enqueue %p %p %p msgLen:%d ver:%" PRId64 ", total in queue:%d", pTask->taskId,
|
||||||
pTask, pItem, pSubmitBlock, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen,
|
pTask, pItem, pSubmitBlock, pSubmitBlock->submit.msgStr, pSubmitBlock->submit.msgLen,
|
||||||
pSubmitBlock->submit.ver, total);
|
pSubmitBlock->submit.ver, total);
|
||||||
|
|
||||||
|
taosWriteQitem(pTask->inputQueue->queue, pSubmitBlock);
|
||||||
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
|
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE ||
|
||||||
type == STREAM_INPUT__REF_DATA_BLOCK) {
|
type == STREAM_INPUT__REF_DATA_BLOCK) {
|
||||||
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
taosWriteQitem(pTask->inputQueue->queue, pItem);
|
||||||
|
|
Loading…
Reference in New Issue