fix(stream): record the task check downstream failed info.

This commit is contained in:
Haojun Liao 2024-04-19 16:21:05 +08:00
parent 3769da535e
commit 3baa47f266
2 changed files with 44 additions and 29 deletions

View File

@ -1288,7 +1288,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) {
}
SStreamTask* pTask = *(SStreamTask**)pIter;
stDebug("vgId:%d s-task:%s set closing flag", vgId, pTask->id.idStr);
stDebug("vgId:%d s-task:%s set task closing flag", vgId, pTask->id.idStr);
streamTaskStop(pTask);
}
@ -1649,6 +1649,13 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3
return 0;
}
void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id));
if (p == NULL) { // task does not exists in current vnode, not record the complete info
qError("vgId:%d s-task:0x%x not exists discard the check downstream info", pMeta->vgId, taskId);
streamMetaWUnLock(pMeta);
return 0;
}
SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet;
STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};

View File

@ -1026,7 +1026,7 @@ int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) {
}
taosThreadMutexUnlock(&pInfo->checkInfoLock);
stDebug("s-task:%s set the in check procedure flag", id);
stDebug("s-task:%s set the in-check-procedure flag", id);
return 0;
}
@ -1038,7 +1038,7 @@ int32_t streamTaskCompleteCheck(STaskCheckInfo* pInfo, const char* id) {
}
int64_t el = taosGetTimestampMs() - pInfo->startTs;
stDebug("s-task:%s check downstream completed, elapsed time:%" PRId64 " ms", id, el);
stDebug("s-task:%s clear the in-check-procedure flag, elapsed time:%" PRId64 " ms", id, el);
pInfo->startTs = 0;
pInfo->inCheckProcess = 0;
@ -1098,27 +1098,36 @@ static void rspMonitorFn(void* param, void* tmrId) {
int64_t now = taosGetTimestampMs();
int64_t el = now - pInfo->startTs;
ETaskStatus state = pStat->state;
int32_t numOfReady = 0;
int32_t numOfFault = 0;
const char* id = pTask->id.idStr;
stDebug("s-task:%s start to do check downstream rsp check", pTask->id.idStr);
stDebug("s-task:%s start to do check downstream rsp check", id);
if (state == TASK_STATUS__STOP || state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) {
if (state == TASK_STATUS__STOP) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s vgId:%d quit from monitor rsp tmr, ref:%d", pTask->id.idStr, pStat->name, vgId, ref);
streamTaskCompleteCheck(pInfo, pTask->id.idStr);
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
streamTaskCompleteCheck(pInfo, id);
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
return;
}
if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref);
streamTaskCompleteCheck(pInfo, id);
return;
}
taosThreadMutexLock(&pInfo->checkInfoLock);
if (pInfo->notReadyTasks == 0) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", pTask->id.idStr,
pStat->name, vgId, ref);
stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", id, pStat->name,
vgId, ref);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
streamTaskCompleteCheck(pInfo, pTask->id.idStr);
streamTaskCompleteCheck(pInfo, id);
return;
}
@ -1131,13 +1140,12 @@ static void rspMonitorFn(void* param, void* tmrId) {
if (p->status == TASK_DOWNSTREAM_READY) {
numOfReady += 1;
} else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) {
stDebug("s-task:%s recv status from downstream, task:0x%x, quit from check downstream tasks", pTask->id.idStr,
p->taskId);
stDebug("s-task:%s recv status from downstream, task:0x%x, quit from check downstream tasks", id, p->taskId);
numOfFault += 1;
} else { // TASK_DOWNSTREAM_NOT_READY
if (p->rspTs == 0) { // not response yet
} else { // TASK_DOWNSTREAM_NOT_READY
if (p->rspTs == 0) { // not response yet
ASSERT(p->status == -1);
if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec.
if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec.
taosArrayPush(pTimeoutList, &p->taskId);
} else { // el < CHECK_NOT_RSP_DURATION
// do nothing and continue waiting for their rsps
@ -1148,25 +1156,26 @@ static void rspMonitorFn(void* param, void* tmrId) {
}
}
} else { // unexpected status
stError("s-task:%s unexpected task status:%s during waiting for check rsp", pTask->id.idStr, pStat->name);
stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name);
}
int32_t numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList);
int32_t numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList);
// fault tasks detected, not try anymore
if (((numOfReady + numOfFault + numOfNotReady + numOfTimeout) == taosArrayGetSize(pInfo->pList)) && (numOfFault > 0)) {
if (((numOfReady + numOfFault + numOfNotReady + numOfTimeout) == taosArrayGetSize(pInfo->pList)) &&
(numOfFault > 0)) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug(
"s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "
"detected, ref:%d",
pTask->id.idStr, pStat->name, vgId, ref);
"detected, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d",
id, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList);
streamTaskCompleteCheck(pInfo, pTask->id.idStr);
streamTaskCompleteCheck(pInfo, id);
return;
}
@ -1176,14 +1185,14 @@ static void rspMonitorFn(void* param, void* tmrId) {
stDebug(
"s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notReady:%d, fault:%d, "
"timeout:%d, ready:%d ref:%d",
pTask->id.idStr, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
id, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref);
taosThreadMutexUnlock(&pInfo->checkInfoLock);
// add the not-ready tasks into the final task status result buf, along with related fill-history task if exists.
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
streamMetaAddTaskLaunchResult(pTask->pMeta, pHTaskInfo->id.streamId, pHTaskInfo->id.taskId, pInfo->startTs, now, false);
STaskId* pHId = &pTask->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
}
return;
}
@ -1205,7 +1214,7 @@ static void rspMonitorFn(void* param, void* tmrId) {
}
}
stDebug("s-task:%s %d downstream task(s) not ready, send check msg again", pTask->id.idStr, numOfNotReady);
stDebug("s-task:%s %d downstream task(s) not ready, send check msg again", id, numOfNotReady);
}
if (numOfTimeout > 0) {
@ -1225,15 +1234,14 @@ static void rspMonitorFn(void* param, void* tmrId) {
}
}
stDebug("s-task:%s %d downstream tasks timeout, send check msg again, start ts:%" PRId64, pTask->id.idStr,
numOfTimeout, now);
stDebug("s-task:%s %d downstream tasks timeout, send check msg again, start ts:%" PRId64, id, numOfTimeout, now);
}
taosThreadMutexUnlock(&pInfo->checkInfoLock);
taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr);
stDebug("s-task:%s continue checking rsp in 200ms, notReady:%d, fault:%d, timeout:%d, ready:%d", pTask->id.idStr,
numOfNotReady, numOfFault, numOfTimeout, numOfReady);
stDebug("s-task:%s continue checking rsp in 200ms, notReady:%d, fault:%d, timeout:%d, ready:%d", id, numOfNotReady,
numOfFault, numOfTimeout, numOfReady);
taosArrayDestroy(pNotReadyList);
taosArrayDestroy(pTimeoutList);