diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 4aecb6cc9d..888d6df171 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -1288,7 +1288,7 @@ void streamMetaNotifyClose(SStreamMeta* pMeta) { } SStreamTask* pTask = *(SStreamTask**)pIter; - stDebug("vgId:%d s-task:%s set closing flag", vgId, pTask->id.idStr); + stDebug("vgId:%d s-task:%s set task closing flag", vgId, pTask->id.idStr); streamTaskStop(pTask); } @@ -1649,6 +1649,13 @@ int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int3 return 0; } + void* p = taosHashGet(pMeta->pTasksMap, &id, sizeof(id)); + if (p == NULL) { // task does not exists in current vnode, not record the complete info + qError("vgId:%d s-task:0x%x not exists discard the check downstream info", pMeta->vgId, taskId); + streamMetaWUnLock(pMeta); + return 0; + } + SHashObj* pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet; STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready}; diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 70c2619f6f..8ad24b8edd 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -1026,7 +1026,7 @@ int32_t streamTaskStartCheckDownstream(STaskCheckInfo* pInfo, const char* id) { } taosThreadMutexUnlock(&pInfo->checkInfoLock); - stDebug("s-task:%s set the in check procedure flag", id); + stDebug("s-task:%s set the in-check-procedure flag", id); return 0; } @@ -1038,7 +1038,7 @@ int32_t streamTaskCompleteCheck(STaskCheckInfo* pInfo, const char* id) { } int64_t el = taosGetTimestampMs() - pInfo->startTs; - stDebug("s-task:%s check downstream completed, elapsed time:%" PRId64 " ms", id, el); + stDebug("s-task:%s clear the in-check-procedure flag, elapsed time:%" PRId64 " ms", id, el); pInfo->startTs = 0; pInfo->inCheckProcess = 0; @@ -1098,27 +1098,36 @@ static void rspMonitorFn(void* param, void* tmrId) { int64_t now = taosGetTimestampMs(); int64_t el = now - pInfo->startTs; ETaskStatus state = pStat->state; - int32_t numOfReady = 0; int32_t numOfFault = 0; + const char* id = pTask->id.idStr; - stDebug("s-task:%s start to do check downstream rsp check", pTask->id.idStr); + stDebug("s-task:%s start to do check downstream rsp check", id); - if (state == TASK_STATUS__STOP || state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) { + if (state == TASK_STATUS__STOP) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s status:%s vgId:%d quit from monitor rsp tmr, ref:%d", pTask->id.idStr, pStat->name, vgId, ref); - streamTaskCompleteCheck(pInfo, pTask->id.idStr); + stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); + streamTaskCompleteCheck(pInfo, id); + + streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); + return; + } + + if (state == TASK_STATUS__DROPPING || state == TASK_STATUS__READY) { + int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); + stDebug("s-task:%s status:%s vgId:%d quit from monitor check-rsp tmr, ref:%d", id, pStat->name, vgId, ref); + streamTaskCompleteCheck(pInfo, id); return; } taosThreadMutexLock(&pInfo->checkInfoLock); if (pInfo->notReadyTasks == 0) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); - stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", pTask->id.idStr, - pStat->name, vgId, ref); + stDebug("s-task:%s status:%s vgId:%d all downstream ready, quit from monitor rsp tmr, ref:%d", id, pStat->name, + vgId, ref); taosThreadMutexUnlock(&pInfo->checkInfoLock); - streamTaskCompleteCheck(pInfo, pTask->id.idStr); + streamTaskCompleteCheck(pInfo, id); return; } @@ -1131,13 +1140,12 @@ static void rspMonitorFn(void* param, void* tmrId) { if (p->status == TASK_DOWNSTREAM_READY) { numOfReady += 1; } else if (p->status == TASK_UPSTREAM_NEW_STAGE || p->status == TASK_DOWNSTREAM_NOT_LEADER) { - stDebug("s-task:%s recv status from downstream, task:0x%x, quit from check downstream tasks", pTask->id.idStr, - p->taskId); + stDebug("s-task:%s recv status from downstream, task:0x%x, quit from check downstream tasks", id, p->taskId); numOfFault += 1; - } else { // TASK_DOWNSTREAM_NOT_READY - if (p->rspTs == 0) { // not response yet + } else { // TASK_DOWNSTREAM_NOT_READY + if (p->rspTs == 0) { // not response yet ASSERT(p->status == -1); - if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec. + if (el >= CHECK_NOT_RSP_DURATION) { // not receive info for 10 sec. taosArrayPush(pTimeoutList, &p->taskId); } else { // el < CHECK_NOT_RSP_DURATION // do nothing and continue waiting for their rsps @@ -1148,25 +1156,26 @@ static void rspMonitorFn(void* param, void* tmrId) { } } } else { // unexpected status - stError("s-task:%s unexpected task status:%s during waiting for check rsp", pTask->id.idStr, pStat->name); + stError("s-task:%s unexpected task status:%s during waiting for check rsp", id, pStat->name); } int32_t numOfNotReady = (int32_t)taosArrayGetSize(pNotReadyList); int32_t numOfTimeout = (int32_t)taosArrayGetSize(pTimeoutList); // fault tasks detected, not try anymore - if (((numOfReady + numOfFault + numOfNotReady + numOfTimeout) == taosArrayGetSize(pInfo->pList)) && (numOfFault > 0)) { + if (((numOfReady + numOfFault + numOfNotReady + numOfTimeout) == taosArrayGetSize(pInfo->pList)) && + (numOfFault > 0)) { int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); stDebug( "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart " - "detected, ref:%d", - pTask->id.idStr, pStat->name, vgId, ref); + "detected, notReady:%d, fault:%d, timeout:%d, ready:%d ref:%d", + id, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); taosThreadMutexUnlock(&pInfo->checkInfoLock); taosArrayDestroy(pNotReadyList); taosArrayDestroy(pTimeoutList); - streamTaskCompleteCheck(pInfo, pTask->id.idStr); + streamTaskCompleteCheck(pInfo, id); return; } @@ -1176,14 +1185,14 @@ static void rspMonitorFn(void* param, void* tmrId) { stDebug( "s-task:%s status:%s vgId:%d stopped by other threads to check downstream process, notReady:%d, fault:%d, " "timeout:%d, ready:%d ref:%d", - pTask->id.idStr, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); + id, pStat->name, vgId, numOfNotReady, numOfFault, numOfTimeout, numOfReady, ref); taosThreadMutexUnlock(&pInfo->checkInfoLock); // add the not-ready tasks into the final task status result buf, along with related fill-history task if exists. streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); if (HAS_RELATED_FILLHISTORY_TASK(pTask)) { - SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo; - streamMetaAddTaskLaunchResult(pTask->pMeta, pHTaskInfo->id.streamId, pHTaskInfo->id.taskId, pInfo->startTs, now, false); + STaskId* pHId = &pTask->hTaskInfo.id; + streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false); } return; } @@ -1205,7 +1214,7 @@ static void rspMonitorFn(void* param, void* tmrId) { } } - stDebug("s-task:%s %d downstream task(s) not ready, send check msg again", pTask->id.idStr, numOfNotReady); + stDebug("s-task:%s %d downstream task(s) not ready, send check msg again", id, numOfNotReady); } if (numOfTimeout > 0) { @@ -1225,15 +1234,14 @@ static void rspMonitorFn(void* param, void* tmrId) { } } - stDebug("s-task:%s %d downstream tasks timeout, send check msg again, start ts:%" PRId64, pTask->id.idStr, - numOfTimeout, now); + stDebug("s-task:%s %d downstream tasks timeout, send check msg again, start ts:%" PRId64, id, numOfTimeout, now); } taosThreadMutexUnlock(&pInfo->checkInfoLock); taosTmrReset(rspMonitorFn, CHECK_RSP_INTERVAL, pTask, streamTimer, &pInfo->checkRspTmr); - stDebug("s-task:%s continue checking rsp in 200ms, notReady:%d, fault:%d, timeout:%d, ready:%d", pTask->id.idStr, - numOfNotReady, numOfFault, numOfTimeout, numOfReady); + stDebug("s-task:%s continue checking rsp in 200ms, notReady:%d, fault:%d, timeout:%d, ready:%d", id, numOfNotReady, + numOfFault, numOfTimeout, numOfReady); taosArrayDestroy(pNotReadyList); taosArrayDestroy(pTimeoutList);