From a33e7b2eb3acc21ab7afa3c412f814ac921b108f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 26 Apr 2024 22:22:54 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/stream/inc/streamInt.h | 2 +- source/libs/stream/src/streamCheckStatus.c | 31 +++++++--------------- 2 files changed, 10 insertions(+), 23 deletions(-) diff --git a/source/libs/stream/inc/streamInt.h b/source/libs/stream/inc/streamInt.h index 44fb0706b8..07dce9a451 100644 --- a/source/libs/stream/inc/streamInt.h +++ b/source/libs/stream/inc/streamInt.h @@ -26,7 +26,7 @@ extern "C" { #endif -#define CHECK_RSP_INTERVAL 300 +#define CHECK_RSP_CHECK_INTERVAL 300 #define LAUNCH_HTASK_INTERVAL 100 #define WAIT_FOR_MINIMAL_INTERVAL 100.00 #define MAX_RETRY_LAUNCH_HISTORY_TASK 40 diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index d7960ee725..0a87833055 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -55,7 +55,7 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { // serialize streamProcessScanHistoryFinishRsp if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { - streamTaskPrepareMonitorCheckRsp(pTask); + streamTaskStartMonitorCheckRsp(pTask); STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher; @@ -69,9 +69,8 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { streamSendCheckMsg(pTask, &req, pTask->outputInfo.fixedDispatcher.nodeId, &pTask->outputInfo.fixedDispatcher.epSet); - streamTaskStartMonitorCheckRsp(pTask); } else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { - streamTaskPrepareMonitorCheckRsp(pTask); + streamTaskStartMonitorCheckRsp(pTask); SArray* vgInfo = pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos; @@ -90,9 +89,6 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { idstr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); } - - // the check rsp monitor timer must be invoked here - streamTaskStartMonitorCheckRsp(pTask); } else { // for sink task, set it ready directly. stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", idstr, pTask->info.nodeId); streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, idstr); @@ -165,38 +161,30 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs return 0; } -int32_t streamTaskPrepareMonitorCheckRsp(SStreamTask* pTask) { - /*SStreamTask* p = */streamMetaAcquireOneTask(pTask); // add task ref here +int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { STaskCheckInfo* pInfo = &pTask->taskCheckInfo; - taosThreadMutexLock(&pInfo->checkInfoLock); + int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr); if (code != TSDB_CODE_SUCCESS) { taosThreadMutexUnlock(&pInfo->checkInfoLock); - streamMetaReleaseTask(pTask->pMeta, pTask); return TSDB_CODE_FAILED; } + /*SStreamTask* p = */ streamMetaAcquireOneTask(pTask); // add task ref here streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs()); - taosThreadMutexUnlock(&pInfo->checkInfoLock); - return TSDB_CODE_SUCCESS; -} -int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { - STaskCheckInfo* pInfo = &pTask->taskCheckInfo; - - taosThreadMutexLock(&pInfo->checkInfoLock); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref); if (pInfo->checkRspTmr == NULL) { - pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer); + pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer); } else { - taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); + taosTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); } taosThreadMutexUnlock(&pInfo->checkInfoLock); - return TSDB_CODE_SUCCESS; + return 0; } int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) { @@ -376,7 +364,6 @@ int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, int32_t vgId, const char* id) { SDownstreamStatusInfo info = {.taskId = taskId, .status = -1, .vgId = vgId, .reqId = reqId, .rspTs = 0}; - taosThreadMutexLock(&pInfo->checkInfoLock); SDownstreamStatusInfo* p = findCheckRspStatus(pInfo, taskId); @@ -649,7 +636,7 @@ void rspMonitorFn(void* param, void* tmrId) { handleTimeoutDownstreamTasks(pTask, pTimeoutList); } - taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); + taosTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); taosThreadMutexUnlock(&pInfo->checkInfoLock); stDebug("s-task:%s continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d",