diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 9eab33cee8..d619351a93 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -972,7 +972,7 @@ static int32_t doFindNotSendUpstream(SStreamTask* pTask, SArray* pList, SArray** return 0; } -static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray* pNotSendList) { +static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray** ppNotSendList) { const char* id = pTask->id.idStr; SArray* pList = pTask->upstreamInfo.pList; // send msg to retrieve checkpoint trigger msg SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo; @@ -984,7 +984,7 @@ static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SA return code; } - code = doFindNotSendUpstream(pTask, pList, &pNotSendList); + code = doFindNotSendUpstream(pTask, pList, ppNotSendList); if (code) { streamCleanBeforeQuitTmr(pTmrInfo, param); stDebug("s-task:%s failed to find not send upstream, code:%s, out of tmr", id, tstrerror(code)); @@ -992,7 +992,7 @@ static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SA } // do send retrieve checkpoint trigger msg to upstream - code = doSendRetrieveTriggerMsg(pTask, pNotSendList); + code = doSendRetrieveTriggerMsg(pTask, *ppNotSendList); if (code) { stError("s-task:%s vgId:%d failed to retrieve trigger msg, code:%s", pTask->id.idStr, vgId, tstrerror(code)); code = 0; @@ -1064,7 +1064,7 @@ void checkpointTriggerMonitorFn(void* param, void* tmrId) { } streamMutexLock(&pActiveInfo->lock); - code = chkptTriggerRecvMonitorHelper(pTask, param, pNotSendList); + code = chkptTriggerRecvMonitorHelper(pTask, param, &pNotSendList); streamMutexUnlock(&pActiveInfo->lock); if (code != TSDB_CODE_SUCCESS) {