From 2d1e9ba6317ada5bd39f4272cf68ba2a54a33a07 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Apr 2024 19:11:57 +0800 Subject: [PATCH] fix(stream):add ref for task in check rsp monitor timer. --- include/libs/stream/tstream.h | 1 + source/libs/stream/src/streamCheckStatus.c | 35 ++++++++++++++++----- source/libs/stream/src/streamStartHistory.c | 3 +- 3 files changed, 31 insertions(+), 8 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 3c74a9fd7b..dfe35e50c7 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -848,6 +848,7 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); bool streamTaskIsSinkTask(const SStreamTask* pTask); int32_t streamTaskSendCheckpointReq(SStreamTask* pTask); +int32_t streamTaskPrepareMonitorCheckRsp(SStreamTask* pTask); int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask); int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id); void streamTaskCleanupCheckInfo(STaskCheckInfo* pInfo); diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index d164d01934..d7960ee725 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -55,7 +55,7 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { // serialize streamProcessScanHistoryFinishRsp if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - streamTaskStartMonitorCheckRsp(pTask); + streamTaskPrepareMonitorCheckRsp(pTask); STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher; @@ -69,8 +69,9 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet); - } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { streamTaskStartMonitorCheckRsp(pTask); + } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { + streamTaskPrepareMonitorCheckRsp(pTask); SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; @@ -89,6 +90,9 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } + + // the check rsp monitor timer must be invoked here + streamTaskStartMonitorCheckRsp(pTask); } else { // for sink task, set it ready directly. stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", idstr, pTask->info.nodeId); streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, idstr); @@ -161,18 +165,27 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs return 0; } -int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { +int32_t streamTaskPrepareMonitorCheckRsp(SStreamTask* pTask) { + /*SStreamTask* p = */streamMetaAcquireOneTask(pTask); // add task ref here STaskCheckInfo* pInfo = &pTask->taskCheckInfo; taosThreadMutexLock(&pInfo->checkInfoLock); int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr); if (code != TSDB_CODE_SUCCESS) { taosThreadMutexUnlock(&pInfo->checkInfoLock); + streamMetaReleaseTask(pTask->pMeta, pTask); return TSDB_CODE_FAILED; } streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs()); + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return TSDB_CODE_SUCCESS; +} +int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { + STaskCheckInfo* pInfo = &pTask->taskCheckInfo; + + taosThreadMutexLock(&pInfo->checkInfoLock); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref); @@ -183,7 +196,7 @@ int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { } taosThreadMutexUnlock(&pInfo->checkInfoLock); - return 0; + return TSDB_CODE_SUCCESS; } int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) { @@ -329,7 +342,7 @@ int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) { } stDebug("s-task:%s set the in-check-procedure flag", id); - return 0; + return TSDB_CODE_SUCCESS; } int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* id) { @@ -519,6 +532,7 @@ void handleNotReadyDownstreamTask(SStreamTask* pTask, SArray* pNotReadyList) { void rspMonitorFn(void* param, void* tmrId) { SStreamTask* pTask = param; + SStreamMeta* pMeta = pTask->pMeta; SStreamTaskState* pStat = streamTaskGetStatus(pTask); STaskCheckInfo* pInfo = &pTask->taskCheckInfo; int32_t vgId = pTask->pMeta->vgId; @@ -546,6 +560,8 @@ void rspMonitorFn(void* param, void* tmrId) { STaskId* pHId = &pTask->hTaskInfo.id; streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false); } + + streamMetaReleaseTask(pMeta, pTask); return; } @@ -554,6 +570,7 @@ void rspMonitorFn(void* param, void* tmrId) { stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); streamTaskCompleteCheckRsp(pInfo, true, id); + streamMetaReleaseTask(pMeta, pTask); return; } @@ -565,6 +582,7 @@ void rspMonitorFn(void* param, void* tmrId) { streamTaskCompleteCheckRsp(pInfo, false, id); taosThreadMutexUnlock(&pInfo->checkInfoLock); + streamMetaReleaseTask(pMeta, pTask); return; } @@ -591,6 +609,7 @@ void rspMonitorFn(void* param, void* tmrId) { streamTaskCompleteCheckRsp(pInfo, false, id); taosThreadMutexUnlock(&pInfo->checkInfoLock); + streamMetaReleaseTask(pMeta, pTask); taosArrayDestroy(pNotReadyList); taosArrayDestroy(pTimeoutList); @@ -609,12 +628,14 @@ void rspMonitorFn(void* param, void* tmrId) { taosThreadMutexUnlock(&pInfo->checkInfoLock); // add the not-ready tasks into the final task status result buf, along with related fill-history task if exists. - streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); + streamMetaAddTaskLaunchResult(pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { STaskId* pHId = &pTask->hTaskInfo.id; - streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false); + streamMetaAddTaskLaunchResult(pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false); } + streamMetaReleaseTask(pMeta, pTask); + taosArrayDestroy(pNotReadyList); taosArrayDestroy(pTimeoutList); return; diff --git a/source/libs/stream/src/streamStartHistory.c b/source/libs/stream/src/streamStartHistory.c index 04a99feab0..c76536aedf 100644 --- a/source/libs/stream/src/streamStartHistory.c +++ b/source/libs/stream/src/streamStartHistory.c @@ -85,9 +85,10 @@ void doExecScanhistoryInFuture(void* param, void* tmrId) { SStreamTaskState* p = streamTaskGetStatus(pTask); if (p->state == TASK_STATUS__DROPPING || p->state == TASK_STATUS__STOP) { - streamMetaReleaseTask(pTask->pMeta, pTask); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s status:%s not start scan-history again, ref:%d", pTask->id.idStr, p->name, ref); + + streamMetaReleaseTask(pTask->pMeta, pTask); return; }