diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 1db62abfc0..02fa31ef07 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -819,9 +819,6 @@ ETaskStatus streamTaskGetPrevStatus(const SStreamTask* pTask); void initRpcMsg(SRpcMsg* pMsg, int32_t msgType, void* pCont, int32_t contLen); -// recover and fill history -void streamTaskCheckDownstream(SStreamTask* pTask); - int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamId, int32_t vgId, int64_t stage, int64_t* oldStage); bool streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList); void streamTaskResetUpstreamStageInfo(SStreamTask* pTask); @@ -838,8 +835,10 @@ int32_t streamTaskHandleEventAsync(SStreamTaskSM* pSM, EStreamTaskEvent event, _ int32_t streamTaskOnHandleEventSuccess(SStreamTaskSM* pSM, EStreamTaskEvent event, __state_trans_user_fn callbackFn, void* param); int32_t streamTaskRestoreStatus(SStreamTask* pTask); -int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp, - SRpcHandleInfo* pRpcInfo, int32_t taskId); +void streamTaskSendCheckMsg(SStreamTask* pTask); +void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp); +int32_t streamSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTaskCheckRsp* pRsp, SRpcHandleInfo* pRpcInfo, + int32_t taskId); int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp); int32_t streamLaunchFillHistoryTask(SStreamTask* pTask); int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated); diff --git a/source/dnode/vnode/src/tqCommon/tqCommon.c b/source/dnode/vnode/src/tqCommon/tqCommon.c index be3023122f..4d78f6826c 100644 --- a/source/dnode/vnode/src/tqCommon/tqCommon.c +++ b/source/dnode/vnode/src/tqCommon/tqCommon.c @@ -406,50 +406,16 @@ int32_t tqStreamTaskProcessCheckReq(SStreamMeta* pMeta, SRpcMsg* pMsg) { int32_t msgLen = pMsg->contLen - sizeof(SMsgHead); SStreamTaskCheckReq req; - SDecoder decoder; + SStreamTaskCheckRsp rsp = {0}; + + SDecoder decoder; tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); tDecodeStreamTaskCheckReq(&decoder, &req); tDecoderClear(&decoder); - int32_t taskId = req.downstreamTaskId; - - SStreamTaskCheckRsp rsp = { - .reqId = req.reqId, - .streamId = req.streamId, - .childId = req.childId, - .downstreamNodeId = req.downstreamNodeId, - .downstreamTaskId = req.downstreamTaskId, - .upstreamNodeId = req.upstreamNodeId, - .upstreamTaskId = req.upstreamTaskId, - }; - - // only the leader node handle the check request - if (pMeta->role == NODE_ROLE_FOLLOWER) { - tqError( - "s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg", - taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId); - rsp.status = TASK_DOWNSTREAM_NOT_LEADER; - } else { - SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId); - if (pTask != NULL) { - rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage, &rsp.oldStage); - streamMetaReleaseTask(pMeta, pTask); - - SStreamTaskState* pState = streamTaskGetStatus(pTask); - tqDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64 - ") task:0x%x (vgId:%d), check_status:%d", - pTask->id.idStr, pState->name, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, - rsp.status); - } else { - rsp.status = TASK_DOWNSTREAM_NOT_READY; - tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64 - ") from task:0x%x (vgId:%d), rsp check_status %d", - req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); - } - } - - return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId); + streamTaskProcessCheckMsg(pMeta, &req, &rsp); + return streamSendCheckRsp(pMeta, req.upstreamNodeId, &rsp, &pMsg->info, req.upstreamTaskId); } int32_t tqStreamTaskProcessCheckRsp(SStreamMeta* pMeta, SRpcMsg* pMsg, bool isLeader) { diff --git a/source/libs/stream/src/streamCheckStatus.c b/source/libs/stream/src/streamCheckStatus.c index e6f127349c..9c3908c833 100644 --- a/source/libs/stream/src/streamCheckStatus.c +++ b/source/libs/stream/src/streamCheckStatus.c @@ -39,7 +39,7 @@ static int32_t addDownstreamFailedStatusResultAsync(SMsgCb* pMsgCb, int32_t vgId static SDownstreamStatusInfo* findCheckRspStatus(STaskCheckInfo* pInfo, int32_t taskId); // check status -void streamTaskCheckDownstream(SStreamTask* pTask) { +void streamTaskSendCheckMsg(SStreamTask* pTask) { SDataRange* pRange = &pTask->dataRange; STimeWindow* pWindow = &pRange->window; const char* idstr = pTask->id.idStr; @@ -97,6 +97,46 @@ void streamTaskCheckDownstream(SStreamTask* pTask) { } } +void streamTaskProcessCheckMsg(SStreamMeta* pMeta, SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp) { + int32_t taskId = pReq->downstreamTaskId; + + *pRsp = (SStreamTaskCheckRsp){ + .reqId = pReq->reqId, + .streamId = pReq->streamId, + .childId = pReq->childId, + .downstreamNodeId = pReq->downstreamNodeId, + .downstreamTaskId = pReq->downstreamTaskId, + .upstreamNodeId = pReq->upstreamNodeId, + .upstreamTaskId = pReq->upstreamTaskId, + }; + + // only the leader node handle the check request + if (pMeta->role == NODE_ROLE_FOLLOWER) { + stError( + "s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg", + taskId, pReq->upstreamTaskId, pReq->upstreamNodeId, pMeta->vgId); + pRsp->status = TASK_DOWNSTREAM_NOT_LEADER; + } else { + SStreamTask* pTask = streamMetaAcquireTask(pMeta, pReq->streamId, taskId); + if (pTask != NULL) { + pRsp->status = streamTaskCheckStatus(pTask, pReq->upstreamTaskId, pReq->upstreamNodeId, pReq->stage, &pRsp->oldStage); + streamMetaReleaseTask(pMeta, pTask); + + SStreamTaskState* pState = streamTaskGetStatus(pTask); + stDebug("s-task:%s status:%s, stage:%" PRId64 " recv task check req(reqId:0x%" PRIx64 + ") task:0x%x (vgId:%d), check_status:%d", + pTask->id.idStr, pState->name, pRsp->oldStage, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId, + pRsp->status); + } else { + pRsp->status = TASK_DOWNSTREAM_NOT_READY; + stDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64 + ") from task:0x%x (vgId:%d), rsp check_status %d", + pReq->streamId, taskId, pRsp->reqId, pRsp->upstreamTaskId, pRsp->upstreamNodeId, pRsp->status); + } + } + +} + int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRsp) { ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); @@ -152,7 +192,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs STaskId* pId = &pTask->hTaskInfo.id; streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, now, false); } - } else { // TASK_DOWNSTREAM_NOT_READY, let's retry in 100ms + } else { // TASK_DOWNSTREAM_NOT_READY, rsp-check monitor will retry in 300 ms ASSERT(left > 0); stDebug("s-task:%s (vgId:%d) recv check rsp from task:0x%x (vgId:%d) status:%d, total:%d not ready:%d", id, pRsp->upstreamNodeId, pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->status, total, left); @@ -162,7 +202,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs return 0; } -int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* pReq, SStreamTaskCheckRsp* pRsp, +int32_t streamSendCheckRsp(const SStreamMeta* pMeta, int32_t vgId, SStreamTaskCheckRsp* pRsp, SRpcHandleInfo* pRpcInfo, int32_t taskId) { SEncoder encoder; int32_t code; @@ -175,7 +215,7 @@ int32_t streamSendCheckRsp(const SStreamMeta* pMeta, const SStreamTaskCheckReq* } void* buf = rpcMallocCont(sizeof(SMsgHead) + len); - ((SMsgHead*)buf)->vgId = htonl(pReq->upstreamNodeId); + ((SMsgHead*)buf)->vgId = htonl(vgId); void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead)); tEncoderInit(&encoder, (uint8_t*)abuf, len); @@ -420,6 +460,9 @@ void doSendCheckMsg(SStreamTask* pTask, SDownstreamStatusInfo* p) { .stage = pTask->pMeta->stage, }; + // update the reqId for the new check msg + p->reqId = tGenIdPI64(); + STaskOutputInfo* pOutputInfo = &pTask->outputInfo; if (pOutputInfo->type == TASK_OUTPUT__FIXED_DISPATCH) { STaskDispatcherFixed* pDispatch = &pOutputInfo->fixedDispatcher; diff --git a/source/libs/stream/src/streamTaskSm.c b/source/libs/stream/src/streamTaskSm.c index c16598f84c..cced6a6b84 100644 --- a/source/libs/stream/src/streamTaskSm.c +++ b/source/libs/stream/src/streamTaskSm.c @@ -84,7 +84,7 @@ int32_t streamTaskInitStatus(SStreamTask* pTask) { stDebug("s-task:%s start init, and check downstream tasks, set the init ts:%" PRId64, pTask->id.idStr, pTask->execInfo.checkTs); - streamTaskCheckDownstream(pTask); + streamTaskSendCheckMsg(pTask); return 0; }