From a8fac441be5698baf22ece6c0c99817bf242b0ef Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 22 Apr 2024 14:02:31 +0800 Subject: [PATCH] fix(stream):update the check-rsp prcedure, to avoid repeatly start check-rsp procedure. --- include/libs/stream/tstream.h | 5 +- source/dnode/vnode/src/tqCommon/tqCommon.c | 5 +- source/libs/stream/src/streamStart.c | 11 +-- source/libs/stream/src/streamTask.c | 81 ++++++++++++++-------- 4 files changed, 61 insertions(+), 41 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 0f399da8fd..de7c743b7d 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -444,6 +444,7 @@ typedef struct STaskCheckInfo { int64_t startTs; int32_t notReadyTasks; int32_t inCheckProcess; + int32_t stopCheckProcess; tmr_h checkRspTmr; TdThreadMutex checkInfoLock; } STaskCheckInfo; @@ -844,14 +845,12 @@ int32_t streamTaskSetDb(SStreamMeta* pMeta, void* pTask, char* key); bool streamTaskIsSinkTask(const SStreamTask* pTask); int32_t streamTaskSendCheckpointReq(SStreamTask* pTask); -int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs); int32_t streamTaskAddReqInfo(STaskCheckInfo* pInfo, int64_t reqId, int32_t taskId, const char* id); int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t status, int64_t rspTs, int64_t reqId, int32_t* pNotReady, const char* id); void streamTaskCleanCheckInfo(STaskCheckInfo* pInfo); -int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id); -int32_t streamTaskCompleteCheck(STaskCheckInfo* pInfo, const char* id); int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask); +int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id); void streamTaskStatusInit(STaskStatusEntry* pEntry, const SStreamTask* pTask); void streamTaskStatusCopy(STaskStatusEntry* pDst, const STaskStatusEntry* pSrc); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index 4ce8579ea0..4667cd73b1 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -216,7 +216,8 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM streamTaskUpdateEpsetInfo(pTask, req.pNodeList); streamTaskResetStatus(pTask); - streamTaskCompleteCheck(&pTask->taskCheckInfo, pTask->id.idStr); + + streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr); SStreamTask** ppHTask = NULL; if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { @@ -231,7 +232,7 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM tqDebug("s-task:%s fill-history task update nodeEp along with stream task", (*ppHTask)->id.idStr); streamTaskUpdateEpsetInfo(*ppHTask, req.pNodeList); streamTaskResetStatus(*ppHTask); - streamTaskCompleteCheck(&(*ppHTask)->taskCheckInfo, (*ppHTask)->id.idStr); + streamTaskStopMonitorCheckRsp(&(*ppHTask)->taskCheckInfo, (*ppHTask)->id.idStr); } } diff --git a/source/libs/stream/src/streamStart.c b/source/libs/stream/src/streamStart.c index b9b7c8ddfa..0c4f00de6a 100644 --- a/source/libs/stream/src/streamStart.c +++ b/source/libs/stream/src/streamStart.c @@ -184,13 +184,6 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { ASSERT(pTask->status.downstreamReady == 0); - int32_t code = streamTaskStartCheckDownstream(&pTask->taskCheckInfo, pTask->id.idStr); - if (code != TSDB_CODE_SUCCESS) { - return; - } - - streamTaskInitTaskCheckInfo(&pTask->taskCheckInfo, &pTask->outputInfo, taosGetTimestampMs()); - // serialize streamProcessScanHistoryFinishRsp if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) { req.reqId = tGenIdPI64(); @@ -230,7 +223,7 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { streamTaskStartMonitorCheckRsp(pTask); } else { // for sink task, set it ready directly. stDebug("s-task:%s (vgId:%d) set downstream ready, since no downstream", pTask->id.idStr, pTask->info.nodeId); - streamTaskCompleteCheck(&pTask->taskCheckInfo, pTask->id.idStr); + streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr); doProcessDownstreamReadyRsp(pTask); } } @@ -405,7 +398,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs if (left == 0) { doProcessDownstreamReadyRsp(pTask); // all downstream tasks are ready, set the complete check downstream flag - streamTaskCompleteCheck(pInfo, id); + streamTaskStopMonitorCheckRsp(pInfo, id); } else { stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id, pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left); diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 7dc93ceccf..5d725b012c 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -942,15 +942,13 @@ int32_t streamTaskSendCheckpointReq(SStreamTask* pTask) { return 0; } -int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) { +static int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOutputInfo, int64_t startTs) { if (pInfo->pList == NULL) { pInfo->pList = taosArrayInit(4, sizeof(SDownstreamStatusInfo)); } else { taosArrayClear(pInfo->pList); } - taosThreadMutexLock(&pInfo->checkInfoLock); - if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) { pInfo->notReadyTasks = 1; } else if (pOutputInfo->type == TASK_OUTPUT__SHUFFLE_DISPATCH) { @@ -959,8 +957,6 @@ int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOut } pInfo->startTs = startTs; - - taosThreadMutexUnlock(&pInfo->checkInfoLock); return TSDB_CODE_SUCCESS; } @@ -1014,39 +1010,33 @@ int32_t streamTaskUpdateCheckInfo(STaskCheckInfo* pInfo, int32_t taskId, int32_t return TSDB_CODE_FAILED; } -int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) { - taosThreadMutexLock(&pInfo->checkInfoLock); +static int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) { if (pInfo->inCheckProcess == 0) { pInfo->inCheckProcess = 1; } else { ASSERT(pInfo->startTs > 0); - stError("s-task:%s already in check procedure, checkTs:%"PRId64, id, pInfo->startTs); - - taosThreadMutexUnlock(&pInfo->checkInfoLock); + stError("s-task:%s already in check procedure, checkTs:%"PRId64", start monitor check rsp failed", id, pInfo->startTs); return TSDB_CODE_FAILED; } - taosThreadMutexUnlock(&pInfo->checkInfoLock); stDebug("s-task:%s set the in-check-procedure flag", id); return 0; } -int32_t streamTaskCompleteCheck(STaskCheckInfo* pInfo, const char* id) { - taosThreadMutexLock(&pInfo->checkInfoLock); +static int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, const char* id) { if (!pInfo->inCheckProcess) { - taosThreadMutexUnlock(&pInfo->checkInfoLock); - return TSDB_CODE_SUCCESS; + stWarn("s-task:%s already not in-check-procedure", id); } int64_t el = taosGetTimestampMs() - pInfo->startTs; - stDebug("s-task:%s clear the in-check-procedure flag, elapsed time:%" PRId64 " ms", id, el); + stDebug("s-task:%s clear the in-check-procedure flag, not in-check-procedure elapsed time:%" PRId64 " ms", id, el); pInfo->startTs = 0; - pInfo->inCheckProcess = 0; pInfo->notReadyTasks = 0; + pInfo->inCheckProcess = 0; + pInfo->stopCheckProcess = 1; taosArrayClear(pInfo->pList); - taosThreadMutexUnlock(&pInfo->checkInfoLock); return 0; } @@ -1108,7 +1098,10 @@ static void rspMonitorFn(void* param, void* tmrId) { if (state == TASK_STATUS__STOP) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); - streamTaskCompleteCheck(pInfo, id); + + taosThreadMutexLock(&pInfo->checkInfoLock); + streamTaskCompleteCheckRsp(pInfo, id); + taosThreadMutexUnlock(&pInfo->checkInfoLock); streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); return; @@ -1117,7 +1110,11 @@ static void rspMonitorFn(void* param, void* tmrId) { if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); - streamTaskCompleteCheck(pInfo, id); + + taosThreadMutexLock(&pInfo->checkInfoLock); + streamTaskCompleteCheckRsp(pInfo, id); + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return; } @@ -1127,8 +1124,8 @@ static void rspMonitorFn(void* param, void* tmrId) { stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", id, pStat->name, vgId, ref); + streamTaskCompleteCheckRsp(pInfo, id); taosThreadMutexUnlock(&pInfo->checkInfoLock); - streamTaskCompleteCheck(pInfo, id); return; } @@ -1176,17 +1173,19 @@ static void rspMonitorFn(void* param, void* tmrId) { taosArrayDestroy(pNotReadyList); taosArrayDestroy(pTimeoutList); - streamTaskCompleteCheck(pInfo, id); + streamTaskCompleteCheckRsp(pInfo, id); return; } // checking of downstream tasks has been stopped by other threads - if (pInfo->inCheckProcess == 0) { + if (pInfo->stopCheckProcess == 1) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug( "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notReady:%d, fault:%d, " "timeout:%d, ready:%d ref:%d", id, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + + streamTaskCompleteCheckRsp(pInfo, id); taosThreadMutexUnlock(&pInfo->checkInfoLock); // add the not-ready tasks into the final task status result buf, along with related fill-history task if exists. @@ -1238,10 +1237,10 @@ static void rspMonitorFn(void* param, void* tmrId) { stDebug("s-task:%s %d downstream tasks timeout, send check msg again, start ts:%" PRId64, id, numOfTimeout, now); } + taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); taosThreadMutexUnlock(&pInfo->checkInfoLock); - taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); - stDebug("s-task:%s continue checking rsp in 200ms, notReady:%d, fault:%d, timeout:%d, ready:%d", id, numOfNotReady, + stDebug("s-task:%s continue checking rsp in 300ms, notReady:%d, fault:%d, timeout:%d, ready:%d", id, numOfNotReady, numOfFault, numOfTimeout, numOfReady); taosArrayDestroy(pNotReadyList); @@ -1249,14 +1248,42 @@ static void rspMonitorFn(void* param, void* tmrId) { } int32_t streamTaskStartMonitorCheckRsp(SStreamTask* pTask) { - ASSERT(pTask->taskCheckInfo.checkRspTmr == NULL); + STaskCheckInfo* pInfo = &pTask->taskCheckInfo; + + taosThreadMutexLock(&pInfo->checkInfoLock); + int32_t code = streamTaskStartCheckDownstream(pInfo, pTask->id.idStr); + if (code != TSDB_CODE_SUCCESS) { + + taosThreadMutexUnlock(&pInfo->checkInfoLock); + return TSDB_CODE_FAILED; + } + + streamTaskInitTaskCheckInfo(pInfo, &pTask->outputInfo, taosGetTimestampMs()); int32_t ref = atomic_add_fetch_32(&pTask->status.timerActive, 1); stDebug("s-task:%s start check rsp monit, ref:%d ", pTask->id.idStr, ref); - pTask->taskCheckInfo.checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer); + + if (pInfo->checkRspTmr == NULL) { + pInfo->checkRspTmr = taosTmrStart(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer); + } else { + taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, pInfo->checkRspTmr); + } + + taosThreadMutexUnlock(&pInfo->checkInfoLock); return 0; } +int32_t streamTaskStopMonitorCheckRsp(STaskCheckInfo* pInfo, const char* id) { + taosThreadMutexLock(&pInfo->checkInfoLock); + streamTaskCompleteCheckRsp(pInfo, id); + + pInfo->stopCheckProcess = 1; + taosThreadMutexUnlock(&pInfo->checkInfoLock); + + stDebug("s-task:%s set stop check rsp mon", id); + return TSDB_CODE_SUCCESS; +} + void streamTaskCleanCheckInfo(STaskCheckInfo* pInfo) { ASSERT(pInfo->inCheckProcess == 0);