fix(stream): fix memory leak.
This commit is contained in:
parent
a81cc9aebf
commit
98f40325e9
|
@ -77,6 +77,8 @@ int32_t sndExpandTask(SSnode *pSnode, SStreamTask *pTask, int64_t ver) {
|
|||
pTask->chkInfo.version = ver;
|
||||
pTask->pMeta = pSnode->pMeta;
|
||||
|
||||
streamTaskOpenAllUpstreamInput(pTask);
|
||||
|
||||
pTask->pState = streamStateOpen(pSnode->path, pTask, false, -1, -1);
|
||||
if (pTask->pState == NULL) {
|
||||
return -1;
|
||||
|
|
|
@ -15,45 +15,6 @@
|
|||
|
||||
#include "streamInt.h"
|
||||
|
||||
SStreamDataBlock* createStreamBlockFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) {
|
||||
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen);
|
||||
if (pData == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
pData->type = blockType;
|
||||
pData->srcVgId = srcVg;
|
||||
pData->srcTaskId = pReq->upstreamTaskId;
|
||||
|
||||
int32_t blockNum = pReq->blockNum;
|
||||
SArray* pArray = taosArrayInit_s(sizeof(SSDataBlock), blockNum);
|
||||
if (pArray == NULL) {
|
||||
taosFreeQitem(pData);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
ASSERT((pReq->blockNum == taosArrayGetSize(pReq->data)) && (pReq->blockNum == taosArrayGetSize(pReq->dataLen)));
|
||||
|
||||
for (int32_t i = 0; i < blockNum; i++) {
|
||||
SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*) taosArrayGetP(pReq->data, i);
|
||||
SSDataBlock* pDataBlock = taosArrayGet(pArray, i);
|
||||
blockDecode(pDataBlock, pRetrieve->data);
|
||||
|
||||
// TODO: refactor
|
||||
pDataBlock->info.window.skey = be64toh(pRetrieve->skey);
|
||||
pDataBlock->info.window.ekey = be64toh(pRetrieve->ekey);
|
||||
pDataBlock->info.version = be64toh(pRetrieve->version);
|
||||
pDataBlock->info.watermark = be64toh(pRetrieve->watermark);
|
||||
memcpy(pDataBlock->info.parTbName, pRetrieve->parTbName, TSDB_TABLE_NAME_LEN);
|
||||
|
||||
pDataBlock->info.type = pRetrieve->streamBlockType;
|
||||
pDataBlock->info.childId = pReq->upstreamChildId;
|
||||
}
|
||||
|
||||
pData->blocks = pArray;
|
||||
return pData;
|
||||
}
|
||||
|
||||
SStreamDataBlock* createStreamDataFromDispatchMsg(const SStreamDispatchReq* pReq, int32_t blockType, int32_t srcVg) {
|
||||
SStreamDataBlock* pData = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, pReq->totalLen);
|
||||
if (pData == NULL) {
|
||||
|
@ -243,7 +204,7 @@ void streamFreeQitem(SStreamQueueItem* data) {
|
|||
if (type == STREAM_INPUT__GET_RES) {
|
||||
blockDataDestroy(((SStreamTrigger*)data)->pBlock);
|
||||
taosFreeQitem(data);
|
||||
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE) {
|
||||
} else if (type == STREAM_INPUT__DATA_BLOCK || type == STREAM_INPUT__DATA_RETRIEVE || type == STREAM_INPUT__TRANS_STATE) {
|
||||
taosArrayDestroyEx(((SStreamDataBlock*)data)->blocks, (FDelete)blockDataFreeRes);
|
||||
taosFreeQitem(data);
|
||||
} else if (type == STREAM_INPUT__DATA_SUBMIT) {
|
||||
|
|
|
@ -774,7 +774,7 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
|
|||
if (pRsp->inputStatus == TASK_INPUT_STATUS__BLOCKED) {
|
||||
pTask->inputStatus = TASK_INPUT_STATUS__BLOCKED; // block the input of current task, to push pressure to upstream
|
||||
pTask->msgInfo.blockingTs = taosGetTimestampMs(); // record the blocking start time
|
||||
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 "wait for %dms and retry dispatch data",
|
||||
qError("s-task:%s inputQ of downstream task:0x%x is full, time:%" PRId64 " wait for %dms and retry dispatch data",
|
||||
id, pRsp->downstreamTaskId, pTask->msgInfo.blockingTs, DISPATCH_RETRY_INTERVAL_MS);
|
||||
streamRetryDispatchStreamBlock(pTask, DISPATCH_RETRY_INTERVAL_MS);
|
||||
} else { // pipeline send data in output queue
|
||||
|
|
|
@ -504,6 +504,7 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
|
|||
if (level == TASK_LEVEL__AGG || level == TASK_LEVEL__SINK) {
|
||||
int32_t remain = streamAlignTransferState(pTask);
|
||||
if (remain > 0) {
|
||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||
qDebug("s-task:%s receive upstream transfer state msg, remain:%d", id, remain);
|
||||
return 0;
|
||||
}
|
||||
|
@ -532,6 +533,8 @@ int32_t streamProcessTranstateBlock(SStreamTask* pTask, SStreamDataBlock* pBlock
|
|||
}
|
||||
} else { // non-dispatch task, do task state transfer directly
|
||||
qDebug("s-task:%s non-dispatch task, start to transfer state directly", id);
|
||||
|
||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||
ASSERT(pTask->info.fillHistory == 1);
|
||||
code = streamTransferStateToStreamTask(pTask);
|
||||
|
||||
|
|
Loading…
Reference in New Issue