fix(stream): update the timeout measurement.

This commit is contained in:
Haojun Liao 2024-04-28 11:14:10 +08:00
parent ed962186a1
commit 6ea4823f1e
2 changed files with 11 additions and 5 deletions

View File

@ -443,6 +443,7 @@ typedef struct SDownstreamStatusInfo {
typedef struct STaskCheckInfo {
SArray* pList;
int64_t startTs;
int64_t timeoutStartTs;
int32_t notReadyTasks;
int32_t inCheckProcess;
int32_t stopCheckProcess;

View File

@ -272,6 +272,7 @@ int32_t streamTaskInitTaskCheckInfo(STaskCheckInfo* pInfo, STaskOutputInfo* pOut
}
pInfo->startTs = startTs;
pInfo->timeoutStartTs = startTs;
return TSDB_CODE_SUCCESS;
}
@ -346,6 +347,7 @@ int32_t streamTaskCompleteCheckRsp(STaskCheckInfo* pInfo, bool lock, const char*
stDebug("s-task:%s clear the in-check-procedure flag, not in-check-procedure elapsed time:%" PRId64 " ms", id, el);
pInfo->startTs = 0;
pInfo->timeoutStartTs = 0;
pInfo->notReadyTasks = 0;
pInfo->inCheckProcess = 0;
pInfo->stopCheckProcess = 0;
@ -458,6 +460,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
int32_t numOfTimeout = taosArrayGetSize(pTimeoutList);
ASSERT(pTask->status.downstreamReady == 0);
pInfo->timeoutStartTs = taosGetTimestampMs();
for (int32_t i = 0; i < numOfTimeout; ++i) {
int32_t taskId = *(int32_t*)taosArrayGet(pTimeoutList, i);
@ -488,7 +491,7 @@ void handleTimeoutDownstreamTasks(SStreamTask* pTask, SArray* pTimeoutList) {
stDebug("s-task:%s vgId:%d %d downstream task(s) all add into nodeUpate list", id, vgId, numOfTimeout);
} else {
stDebug("s-task:%s vgId:%d %d downstream task(s) timeout, send check msg again, retry:%d start time:%" PRId64, id,
vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->startTs);
vgId, numOfTimeout, pInfo->timeoutRetryCount, pInfo->timeoutStartTs);
}
}
@ -524,7 +527,7 @@ void rspMonitorFn(void* param, void* tmrId) {
STaskCheckInfo* pInfo = &pTask->taskCheckInfo;
int32_t vgId = pTask->pMeta->vgId;
int64_t now = taosGetTimestampMs();
int64_t el = now - pInfo->startTs;
int64_t timeoutDuration = now - pInfo->timeoutStartTs;
ETaskStatus state = pStat->state;
const char* id = pTask->id.idStr;
int32_t numOfReady = 0;
@ -577,7 +580,7 @@ void rspMonitorFn(void* param, void* tmrId) {
SArray* pTimeoutList = taosArrayInit(4, sizeof(int64_t));
if (pStat->state == TASK_STATUS__UNINIT) {
getCheckRspStatus(pInfo, el, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id);
getCheckRspStatus(pInfo, timeoutDuration, &numOfReady, &numOfFault, &numOfNotRsp, pTimeoutList, pNotReadyList, id);
} else { // unexpected status
stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name);
}
@ -639,8 +642,10 @@ void rspMonitorFn(void* param, void* tmrId) {
taosTmrReset(rspMonitorFn, CHECK_RSP_CHECK_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
stDebug("s-task:%s continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, ready:%d",
id, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
stDebug(
"s-task:%s vgId:%d continue checking rsp in 300ms, total:%d, notRsp:%d, notReady:%d, fault:%d, timeout:%d, "
"ready:%d",
id, vgId, total, numOfNotRsp, numOfNotReady, numOfFault, numOfTimeout, numOfReady);
taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList);