From 49aff4571b351becb28b76d1ab007a6775391d4d Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Jan 2024 11:33:07 +0800 Subject: [PATCH] fix(stream): fix memory leak. --- source/dnode/mnode/impl/src/mndStream.c | 16 ++++------------ source/dnode/vnode/src/tq/tqStreamTask.c | 14 +++++++++++--- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 0392f51f51..9c92d036b7 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -617,11 +617,10 @@ static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask pReq->taskId = pTask->id.taskId; pReq->streamId = pTask->id.streamId; - SEpSet epset = {0}; - bool hasEpset = false; - + SEpSet epset = {0}; + bool hasEpset = false; int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId); - if (code != TSDB_CODE_SUCCESS || !hasEpset) { // no valid epset, return directly without redoAction + if (code != TSDB_CODE_SUCCESS || !hasEpset) { // no valid epset, return directly without redoAction terrno = code; return -1; } @@ -940,17 +939,10 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre for (int32_t j = 0; j < sz; j++) { SStreamTask *pTask = taosArrayGetP(pLevel, j); - SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId); - if (pVgObj == NULL) { - taosWUnLockLatch(&pStream->lock); - goto _ERR; - } - void *buf; int32_t tlen; if (mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId, - pTask->id.taskId, pTrans->id, mndTrigger) < 0) { - mndReleaseVgroup(pMnode, pVgObj); + pTask->id.taskId, pTrans->id, mndTrigger) < 0) { taosWUnLockLatch(&pStream->lock); goto _ERR; } diff --git a/source/dnode/vnode/src/tq/tqStreamTask.c b/source/dnode/vnode/src/tq/tqStreamTask.c index cdb5cc26f8..d24dc45624 100644 --- a/source/dnode/vnode/src/tq/tqStreamTask.c +++ b/source/dnode/vnode/src/tq/tqStreamTask.c @@ -23,7 +23,7 @@ static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle); static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId); static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver); static bool taskReadyForDataFromWal(SStreamTask* pTask); -static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems); +static bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems); static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration); // extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks. @@ -223,6 +223,14 @@ int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) { // append the data for the stream tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr, pTask->chkInfo.nextProcessVer); + } else if (currentVer != pTask->chkInfo.nextProcessVer) { + int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer); + if (code != TSDB_CODE_SUCCESS) { + return code; + } + + tqDebug("vgId:%d s-task:%s wal reader seek back to ver:%" PRId64, vgId, pTask->id.idStr, + pTask->chkInfo.nextProcessVer); } } @@ -300,7 +308,7 @@ bool taskReadyForDataFromWal(SStreamTask* pTask) { return true; } -bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) { +bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) { const char* id = pTask->id.idStr; int32_t numOfNewItems = 0; @@ -399,7 +407,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) { continue; } - bool hasNewData = doPutDataIntoInputQFromWal(pTask, maxVer, &numOfItems); + bool hasNewData = doPutDataIntoInputQ(pTask, maxVer, &numOfItems); taosThreadMutexUnlock(&pTask->lock); if ((numOfItems > 0) || hasNewData) {