fix(stream): fix invalid read and memory leak.

This commit is contained in:
Haojun Liao 2023-09-25 23:10:33 +08:00
parent c9576d7a85
commit 5a4a0aa0e3
2 changed files with 7 additions and 5 deletions

View File

@ -824,6 +824,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
bool hasSubmit = false;
for (int32_t i = 0; i < numOfBlocks; i++) {
if (streamTaskShouldStop(&pTask->status)) {
taosHashCleanup(pTableIndexMap);
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
return;
}

View File

@ -165,17 +165,17 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem*
SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)pElem;
taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks);
taosArrayDestroy(pBlockSrc->blocks);
taosFreeQitem(pElem);
streamQueueItemIncSize(dst, streamQueueItemGetSize(pElem));
taosFreeQitem(pElem);
return dst;
} else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst;
SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)pElem;
streamMergeSubmit(pMerged, pBlockSrc);
taosFreeQitem(pElem);
streamQueueItemIncSize(dst, streamQueueItemGetSize(pElem));
taosFreeQitem(pElem);
return dst;
} else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamMergedSubmit* pMerged = streamMergedSubmitNew();
@ -188,9 +188,9 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem*
streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst);
streamMergeSubmit(pMerged, (SStreamDataSubmit*)pElem);
taosFreeQitem(dst);
taosFreeQitem(pElem);
return (SStreamQueueItem*)pMerged;
} else {
stDebug("block type:%s not merged with existed blocks list, type:%d", streamQueueItemGetTypeStr(pElem->type), dst->type);