fix(stream): fix memory leak.
This commit is contained in:
parent
9b3b03caab
commit
49aff4571b
|
@ -617,11 +617,10 @@ static int32_t mndPersistTaskDropReq(SMnode *pMnode, STrans *pTrans, SStreamTask
|
|||
pReq->taskId = pTask->id.taskId;
|
||||
pReq->streamId = pTask->id.streamId;
|
||||
|
||||
SEpSet epset = {0};
|
||||
bool hasEpset = false;
|
||||
|
||||
SEpSet epset = {0};
|
||||
bool hasEpset = false;
|
||||
int32_t code = extractNodeEpset(pMnode, &epset, &hasEpset, pTask->id.taskId, pTask->info.nodeId);
|
||||
if (code != TSDB_CODE_SUCCESS || !hasEpset) { // no valid epset, return directly without redoAction
|
||||
if (code != TSDB_CODE_SUCCESS || !hasEpset) { // no valid epset, return directly without redoAction
|
||||
terrno = code;
|
||||
return -1;
|
||||
}
|
||||
|
@ -940,17 +939,10 @@ static int32_t mndProcessStreamCheckpointTrans(SMnode *pMnode, SStreamObj *pStre
|
|||
for (int32_t j = 0; j < sz; j++) {
|
||||
SStreamTask *pTask = taosArrayGetP(pLevel, j);
|
||||
|
||||
SVgObj *pVgObj = mndAcquireVgroup(pMnode, pTask->info.nodeId);
|
||||
if (pVgObj == NULL) {
|
||||
taosWUnLockLatch(&pStream->lock);
|
||||
goto _ERR;
|
||||
}
|
||||
|
||||
void *buf;
|
||||
int32_t tlen;
|
||||
if (mndBuildStreamCheckpointSourceReq(&buf, &tlen, pTask->info.nodeId, checkpointId, pTask->id.streamId,
|
||||
pTask->id.taskId, pTrans->id, mndTrigger) < 0) {
|
||||
mndReleaseVgroup(pMnode, pVgObj);
|
||||
pTask->id.taskId, pTrans->id, mndTrigger) < 0) {
|
||||
taosWUnLockLatch(&pStream->lock);
|
||||
goto _ERR;
|
||||
}
|
||||
|
|
|
@ -23,7 +23,7 @@ static int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle);
|
|||
static int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId);
|
||||
static bool handleFillhistoryScanComplete(SStreamTask* pTask, int64_t ver);
|
||||
static bool taskReadyForDataFromWal(SStreamTask* pTask);
|
||||
static bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems);
|
||||
static bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems);
|
||||
static int32_t tqScanWalInFuture(STQ* pTq, int32_t numOfTasks, int32_t idleDuration);
|
||||
|
||||
// extract data blocks(submit/delete) from WAL, and add them into the input queue for all the sources tasks.
|
||||
|
@ -223,6 +223,14 @@ int32_t setWalReaderStartOffset(SStreamTask* pTask, int32_t vgId) {
|
|||
// append the data for the stream
|
||||
tqDebug("vgId:%d s-task:%s wal reader initial seek to ver:%" PRId64, vgId, pTask->id.idStr,
|
||||
pTask->chkInfo.nextProcessVer);
|
||||
} else if (currentVer != pTask->chkInfo.nextProcessVer) {
|
||||
int32_t code = walReaderSeekVer(pTask->exec.pWalReader, pTask->chkInfo.nextProcessVer);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
return code;
|
||||
}
|
||||
|
||||
tqDebug("vgId:%d s-task:%s wal reader seek back to ver:%" PRId64, vgId, pTask->id.idStr,
|
||||
pTask->chkInfo.nextProcessVer);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -300,7 +308,7 @@ bool taskReadyForDataFromWal(SStreamTask* pTask) {
|
|||
return true;
|
||||
}
|
||||
|
||||
bool doPutDataIntoInputQFromWal(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) {
|
||||
bool doPutDataIntoInputQ(SStreamTask* pTask, int64_t maxVer, int32_t* numOfItems) {
|
||||
const char* id = pTask->id.idStr;
|
||||
int32_t numOfNewItems = 0;
|
||||
|
||||
|
@ -399,7 +407,7 @@ int32_t doScanWalForAllTasks(SStreamMeta* pStreamMeta, bool* pScanIdle) {
|
|||
continue;
|
||||
}
|
||||
|
||||
bool hasNewData = doPutDataIntoInputQFromWal(pTask, maxVer, &numOfItems);
|
||||
bool hasNewData = doPutDataIntoInputQ(pTask, maxVer, &numOfItems);
|
||||
taosThreadMutexUnlock(&pTask->lock);
|
||||
|
||||
if ((numOfItems > 0) || hasNewData) {
|
||||
|
|
Loading…
Reference in New Issue