fix(stream): fix invalid read and memory leak.
This commit is contained in:
parent
65f66f0986
commit
6a98b11bef
|
@ -824,6 +824,8 @@ void tqSinkDataIntoDstTable(SStreamTask* pTask, void* vnode, void* data) {
|
||||||
bool hasSubmit = false;
|
bool hasSubmit = false;
|
||||||
for (int32_t i = 0; i < numOfBlocks; i++) {
|
for (int32_t i = 0; i < numOfBlocks; i++) {
|
||||||
if (streamTaskShouldStop(&pTask->status)) {
|
if (streamTaskShouldStop(&pTask->status)) {
|
||||||
|
taosHashCleanup(pTableIndexMap);
|
||||||
|
tDestroySubmitReq(&submitReq, TSDB_MSG_FLG_ENCODE);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -165,17 +165,17 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem*
|
||||||
SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)pElem;
|
SStreamDataBlock* pBlockSrc = (SStreamDataBlock*)pElem;
|
||||||
taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks);
|
taosArrayAddAll(pBlock->blocks, pBlockSrc->blocks);
|
||||||
taosArrayDestroy(pBlockSrc->blocks);
|
taosArrayDestroy(pBlockSrc->blocks);
|
||||||
taosFreeQitem(pElem);
|
|
||||||
|
|
||||||
streamQueueItemIncSize(dst, streamQueueItemGetSize(pElem));
|
streamQueueItemIncSize(dst, streamQueueItemGetSize(pElem));
|
||||||
|
|
||||||
|
taosFreeQitem(pElem);
|
||||||
return dst;
|
return dst;
|
||||||
} else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
|
} else if (dst->type == STREAM_INPUT__MERGED_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst;
|
SStreamMergedSubmit* pMerged = (SStreamMergedSubmit*)dst;
|
||||||
SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)pElem;
|
SStreamDataSubmit* pBlockSrc = (SStreamDataSubmit*)pElem;
|
||||||
streamMergeSubmit(pMerged, pBlockSrc);
|
streamMergeSubmit(pMerged, pBlockSrc);
|
||||||
taosFreeQitem(pElem);
|
|
||||||
|
|
||||||
streamQueueItemIncSize(dst, streamQueueItemGetSize(pElem));
|
streamQueueItemIncSize(dst, streamQueueItemGetSize(pElem));
|
||||||
|
|
||||||
|
taosFreeQitem(pElem);
|
||||||
return dst;
|
return dst;
|
||||||
} else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
|
} else if (dst->type == STREAM_INPUT__DATA_SUBMIT && pElem->type == STREAM_INPUT__DATA_SUBMIT) {
|
||||||
SStreamMergedSubmit* pMerged = streamMergedSubmitNew();
|
SStreamMergedSubmit* pMerged = streamMergedSubmitNew();
|
||||||
|
@ -188,9 +188,9 @@ SStreamQueueItem* streamMergeQueueItem(SStreamQueueItem* dst, SStreamQueueItem*
|
||||||
|
|
||||||
streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst);
|
streamMergeSubmit(pMerged, (SStreamDataSubmit*)dst);
|
||||||
streamMergeSubmit(pMerged, (SStreamDataSubmit*)pElem);
|
streamMergeSubmit(pMerged, (SStreamDataSubmit*)pElem);
|
||||||
|
|
||||||
taosFreeQitem(dst);
|
taosFreeQitem(dst);
|
||||||
taosFreeQitem(pElem);
|
taosFreeQitem(pElem);
|
||||||
|
|
||||||
return (SStreamQueueItem*)pMerged;
|
return (SStreamQueueItem*)pMerged;
|
||||||
} else {
|
} else {
|
||||||
stDebug("block type:%s not merged with existed blocks list, type:%d", streamQueueItemGetTypeStr(pElem->type), dst->type);
|
stDebug("block type:%s not merged with existed blocks list, type:%d", streamQueueItemGetTypeStr(pElem->type), dst->type);
|
||||||
|
|
Loading…
Reference in New Issue