fix(stream): fix memory leak.

This commit is contained in:
Haojun Liao 2023-09-14 19:33:05 +08:00
parent 6fa54789cb
commit 3d1f29ff21
3 changed files with 33 additions and 53 deletions

View File

@ -852,11 +852,21 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) {
pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer);
}
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64 " nextProcessVer:%" PRId64
" child id:%d, level:%d, status:%s fill-history:%d, trigger:%" PRId64 " ms",
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->info.fillHistory, pTask->info.triggerParam);
if (pTask->info.fillHistory) {
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
" nextProcessVer:%" PRId64
" child id:%d, level:%d, status:%s fill-history:%d, related stream task:0x%x trigger:%" PRId64 " ms",
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->info.fillHistory, pTask->streamTaskId.taskId, pTask->info.triggerParam);
} else {
tqInfo("vgId:%d expand stream task, s-task:%s, checkpointId:%" PRId64 " checkpointVer:%" PRId64
" nextProcessVer:%" PRId64
" child id:%d, level:%d, status:%s fill-history:%d, related fill-task:0x%x trigger:%" PRId64 " ms",
vgId, pTask->id.idStr, pChkInfo->checkpointId, pChkInfo->checkpointVer, pChkInfo->nextProcessVer,
pTask->info.selfChildId, pTask->info.taskLevel, streamGetTaskStatusStr(pTask->status.taskStatus),
pTask->info.fillHistory, pTask->historyTaskId.taskId, pTask->info.triggerParam);
}
return 0;
}
@ -1181,44 +1191,6 @@ int32_t tqProcessTaskScanHistory(STQ* pTq, SRpcMsg* pMsg) {
return 0;
}
// notify the downstream tasks to transfer executor state after handle all history blocks.
int32_t tqProcessTaskTransferStateReq(STQ* pTq, SRpcMsg* pMsg) {
char* pReq = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t len = pMsg->contLen - sizeof(SMsgHead);
SStreamTransferReq req = {0};
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)pReq, len);
int32_t code = tDecodeStreamScanHistoryFinishReq(&decoder, &req);
tDecoderClear(&decoder);
tqDebug("vgId:%d start to process transfer state msg, from s-task:0x%x", pTq->pStreamMeta->vgId,
req.downstreamTaskId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, req.streamId, req.downstreamTaskId);
if (pTask == NULL) {
tqError("failed to find task:0x%x, it may have been dropped already. process transfer state failed",
req.downstreamTaskId);
return -1;
}
int32_t remain = streamAlignTransferState(pTask);
if (remain > 0) {
tqDebug("s-task:%s receive upstream transfer state msg, remain:%d", pTask->id.idStr, remain);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
// transfer the ownership of executor state
tqDebug("s-task:%s all upstream tasks send transfer msg, open transfer state flag", pTask->id.idStr);
ASSERT(pTask->streamTaskId.taskId != 0 && pTask->info.fillHistory == 1);
streamSchedExec(pTask);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return 0;
}
// only the agg tasks and the sink tasks will receive this message from upstream tasks
int32_t tqProcessTaskScanHistoryFinishReq(STQ* pTq, SRpcMsg* pMsg) {
char* msg = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
@ -1592,6 +1564,10 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t code = 0;
SStreamCheckpointSourceReq req = {0};
if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqDebug("vgId:%d not leader node, ignore checkpoint-source msg", vgId);
return TSDB_CODE_SUCCESS;
}
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, len);
@ -1801,6 +1777,8 @@ int32_t tqProcessTaskUpdateReq(STQ* pTq, SRpcMsg* pMsg) {
vInfo("vgId:%d, restart all stream tasks", vgId);
tqStartStreamTasks(pTq);
tqCheckAndRunStreamTaskAsync(pTq);
} else {
vInfo("vgId:%d, follower node not start stream tasks", vgId);
}
pMeta->taskWillbeLaunched = 0;

View File

@ -358,18 +358,18 @@ int32_t streamDoTransferStateToStreamTask(SStreamTask* pTask) {
streamTaskReleaseState(pTask);
streamTaskReloadState(pStreamTask);
// 3. clear the link between fill-history task and stream task info
pStreamTask->historyTaskId.taskId = 0;
// 4. resume the state of stream task, after this function, the stream task will run immidately. But it can not be
// 3. resume the state of stream task, after this function, the stream task will run immidately. But it can not be
// pause, since the pause allowed attribute is not set yet.
streamTaskResumeFromHalt(pStreamTask);
qDebug("s-task:%s fill-history task set status to be dropping, save the state into disk", pTask->id.idStr);
// 5. free it and remove fill-history task from disk meta-store
// 4. free it and remove fill-history task from disk meta-store
streamMetaUnregisterTask(pMeta, pTask->id.streamId, pTask->id.taskId);
// 5. clear the link between fill-history task and stream task info
pStreamTask->historyTaskId.taskId = 0;
// 6. save to disk
taosWLockLatch(&pMeta->lock);
streamMetaSaveTask(pMeta, pStreamTask);

View File

@ -207,9 +207,6 @@ _err:
if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb);
if (pMeta->db) tdbClose(pMeta->db);
// taosThreadMutexDestroy(&pMeta->backendMutex);
// taosThreadRwlockDestroy(&pMeta->lock);
taosMemoryFree(pMeta);
qError("failed to open stream meta");
@ -695,7 +692,6 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
continue;
}
streamTaskResetUpstreamStageInfo(pTask);
if (taosHashPut(pMeta->pTasks, keys, sizeof(keys), &pTask, sizeof(void*)) < 0) {
doClear(pKey, pVal, pCur, pRecycleList);
tFreeStreamTask(pTask);
@ -708,6 +704,7 @@ int32_t streamMetaLoadAllTasks(SStreamMeta* pMeta) {
ASSERT(pTask->status.downstreamReady == 0);
}
qInfo("vgId:%d pause task num:%d", pMeta->vgId, pMeta->pauseTaskNum);
tdbFree(pKey);
@ -939,4 +936,9 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
qDebug("vgId:%d all stream tasks are not in timer, continue close, elapsed time:%" PRId64 " ms", pMeta->vgId, el);
}
void streamMetaStartHb(SStreamMeta* pMeta) { metaHbToMnode(pMeta, NULL); }
void streamMetaStartHb(SStreamMeta* pMeta) {
int64_t* pRid = taosMemoryMalloc(sizeof(int64_t));
metaRefMgtAdd(pMeta->vgId, pRid);
*pRid = pMeta->rid;
metaHbToMnode(pRid, NULL);
}