fix(stream):add related fill-history task to be done, when check-rsp moniting quit.

This commit is contained in:
Haojun Liao 2024-04-23 16:55:47 +08:00
parent 7fda9d9fcb
commit 10c792efa2
2 changed files with 12 additions and 6 deletions

View File

@ -219,7 +219,7 @@ void streamTaskCheckDownstream(SStreamTask* pTask) {
streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr); streamTaskAddReqInfo(&pTask->taskCheckInfo, req.reqId, req.downstreamTaskId, pTask->id.idStr);
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 stDebug("s-task:%s (vgId:%d) stage:%" PRId64
" check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, reqId:%" PRIx64, " check downstream task:0x%x (vgId:%d) (shuffle), idx:%d, reqId:0x%" PRIx64,
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId); pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i, req.reqId);
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
} }

View File

@ -1091,7 +1091,7 @@ static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
req.reqId = p->reqId; req.reqId = p->reqId;
req.downstreamNodeId = pOutputInfo->fixedDispatcher.nodeId; req.downstreamNodeId = pOutputInfo->fixedDispatcher.nodeId;
req.downstreamTaskId = pOutputInfo->fixedDispatcher.taskId; req.downstreamTaskId = pOutputInfo->fixedDispatcher.taskId;
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) req:0x%" PRIx64, stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) reqId:0x%" PRIx64,
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId); pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, req.reqId);
streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet); streamSendCheckMsg(pTask, &req, pOutputInfo->fixedDispatcher.nodeId, &pOutputInfo->fixedDispatcher.epSet);
@ -1107,8 +1107,10 @@ static void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) {
req.downstreamNodeId = pVgInfo->vgId; req.downstreamNodeId = pVgInfo->vgId;
req.downstreamTaskId = pVgInfo->taskId; req.downstreamTaskId = pVgInfo->taskId;
stDebug("s-task:%s (vgId:%d) stage:%" PRId64 " re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d", stDebug("s-task:%s (vgId:%d) stage:%" PRId64
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i); " re-send check downstream task:0x%x(vgId:%d) (shuffle), idx:%d reqId:0x%" PRIx64,
pTask->id.idStr, pTask->info.nodeId, req.stage, req.downstreamTaskId, req.downstreamNodeId, i,
p->reqId);
streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet); streamSendCheckMsg(pTask, &req, pVgInfo->vgId, &pVgInfo->epSet);
break; break;
} }
@ -1158,7 +1160,7 @@ static void rspMonitorFn(void* param, void* tmrId) {
int32_t numOfNotReady = 0; int32_t numOfNotReady = 0;
int32_t numOfTimeout = 0; int32_t numOfTimeout = 0;
stDebug("s-task:%s start to do check downstream rsp check", id); stDebug("s-task:%s start to do check-downstream-rsp check in tmr", id);
if (state == TASK_STATUS__STOP) { if (state == TASK_STATUS__STOP) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
@ -1169,6 +1171,10 @@ static void rspMonitorFn(void* param, void* tmrId) {
taosThreadMutexUnlock(&pInfo->checkInfoLock); taosThreadMutexUnlock(&pInfo->checkInfoLock);
streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false); streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, pInfo->startTs, now, false);
if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
STaskId* pHId = &pTask->hTaskInfo.id;
streamMetaAddTaskLaunchResult(pTask->pMeta, pHId->streamId, pHId->taskId, pInfo->startTs, now, false);
}
return; return;
} }
@ -1207,7 +1213,7 @@ static void rspMonitorFn(void* param, void* tmrId) {
// fault tasks detected, not try anymore // fault tasks detected, not try anymore
ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == taosArrayGetSize(pInfo->pList)); ASSERT((numOfReady + numOfFault + numOfNotReady + numOfTimeout + numOfNotRsp) == taosArrayGetSize(pInfo->pList));
if ((numOfNotRsp == 0) && (numOfFault > 0)) { if (numOfFault > 0) {
int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1); int32_t ref = atomic_sub_fetch_32(&pTask->status.timerActive, 1);
stDebug( stDebug(
"s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart " "s-task:%s status:%s vgId:%d all rsp. quit from monitor rsp tmr, since vnode-transfer/leader-change/restart "