From 6ea4823f1e87f6b8397865c74ae61ab2520976a8 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sun, 28 Apr 2024 11:14:10 +0800 Subject: [PATCH] fix(stream): update the timeout measurement. --- include/libs/stream/tstream.h | 1 + source/libs/stream/src/streamCheckStatus.c | 15 ++++++++++----- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 119a77cbe8..9b7a433c18 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -443,6 +443,7 @@ typedef struct SDownstreamStatusInfo { typedef struct STaskCheckInfo { SArray* pList; int64_t startTs; + int64_t timeoutStartTs; int32_t notReadyTasks; int32_t inCheckProcess; int32_t stopCheckProcess; diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index 0a87833055..152f890cc6 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -272,6 +272,7 @@ int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOut } pInfo->startTs = startTs; + pInfo->timeoutStartTs = startTs; return TSDB_CODE_SUCCESS; } @@ -346,6 +347,7 @@ int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char* stDebug("s-task:%s clear the in-check-procedure flag, not in-check-procedure elapsed time:%" PRId64 " ms", id, el); pInfo->startTs = 0; + pInfo->timeoutStartTs = 0; pInfo->notReadyTasks = 0; pInfo->inCheckProcess = 0; pInfo->stopCheckProcess = 0; @@ -458,6 +460,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) { int32_t numOfTimeout = taosArrayGetSize(pTimeoutList); ASSERT(pTask->status.downstreamReady == 0); + pInfo->timeoutStartTs = taosGetTimestampMs(); for (int32_t i = 0; i < numOfTimeout; ++i) { int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i); @@ -488,7 +491,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) { stDebug("s-task:%s vgId:%d %d downstream task(s) all add into nodeUpate list", id, vgId, numOfTimeout); } else { stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id, - vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->startTs); + vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->timeoutStartTs); } } @@ -524,7 +527,7 @@ void rspMonitorFn(void* param, void* tmrId) { STaskCheckInfo* pInfo = &pTask->taskCheckInfo; int32_t vgId = pTask->pMeta->vgId; int64_t now = taosGetTimestampMs(); - int64_t el = now - pInfo->startTs; + int64_t timeoutDuration = now - pInfo->timeoutStartTs; ETaskStatus state = pStat->state; const char* id = pTask->id.idStr; int32_t numOfReady = 0; @@ -577,7 +580,7 @@ void rspMonitorFn(void* param, void* tmrId) { SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t)); if (pStat->state == TASK_STATUS__UNINIT) { - getCheckRspStatus(pInfo, el, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id); + getCheckRspStatus(pInfo, timeoutDuration, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id); } else { // unexpected status stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name); } @@ -639,8 +642,10 @@ void rspMonitorFn(void* param, void* tmrId) { taosTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); taosThreadMutexUnlock(&pInfo->checkInfoLock); - stDebug("s-task:%s continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d", - id, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady); + stDebug( + "s-task:%s vgId:%d continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, " + "ready:%d", + id, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady); taosArrayDestroy(pNotReadyList); taosArrayDestroy(pTimeoutList);