fix(stream): fix memory leak.
This commit is contained in:
parent
9220ec5a92
commit
cfdba88c53
|
@ -972,7 +972,7 @@ static int32_t doFindNotSendUpstream(SStreamTask* pTask, SArray* pList, SArray**
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray* pNotSendList) {
|
static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray** ppNotSendList) {
|
||||||
const char* id = pTask->id.idStr;
|
const char* id = pTask->id.idStr;
|
||||||
SArray* pList = pTask->upstreamInfo.pList; // send msg to retrieve checkpoint trigger msg
|
SArray* pList = pTask->upstreamInfo.pList; // send msg to retrieve checkpoint trigger msg
|
||||||
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
|
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
|
||||||
|
@ -984,7 +984,7 @@ static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SA
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = doFindNotSendUpstream(pTask, pList, &pNotSendList);
|
code = doFindNotSendUpstream(pTask, pList, ppNotSendList);
|
||||||
if (code) {
|
if (code) {
|
||||||
streamCleanBeforeQuitTmr(pTmrInfo, param);
|
streamCleanBeforeQuitTmr(pTmrInfo, param);
|
||||||
stDebug("s-task:%s failed to find not send upstream, code:%s, out of tmr", id, tstrerror(code));
|
stDebug("s-task:%s failed to find not send upstream, code:%s, out of tmr", id, tstrerror(code));
|
||||||
|
@ -992,7 +992,7 @@ static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SA
|
||||||
}
|
}
|
||||||
|
|
||||||
// do send retrieve checkpoint trigger msg to upstream
|
// do send retrieve checkpoint trigger msg to upstream
|
||||||
code = doSendRetrieveTriggerMsg(pTask, pNotSendList);
|
code = doSendRetrieveTriggerMsg(pTask, *ppNotSendList);
|
||||||
if (code) {
|
if (code) {
|
||||||
stError("s-task:%s vgId:%d failed to retrieve trigger msg, code:%s", pTask->id.idStr, vgId, tstrerror(code));
|
stError("s-task:%s vgId:%d failed to retrieve trigger msg, code:%s", pTask->id.idStr, vgId, tstrerror(code));
|
||||||
code = 0;
|
code = 0;
|
||||||
|
@ -1064,7 +1064,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
streamMutexLock(&pActiveInfo->lock);
|
streamMutexLock(&pActiveInfo->lock);
|
||||||
code = chkptTriggerRecvMonitorHelper(pTask, param, pNotSendList);
|
code = chkptTriggerRecvMonitorHelper(pTask, param, &pNotSendList);
|
||||||
streamMutexUnlock(&pActiveInfo->lock);
|
streamMutexUnlock(&pActiveInfo->lock);
|
||||||
|
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
|
|
Loading…
Reference in New Issue