From f7bcff862d336d7b0ea687cdaae7fe84dd1db145 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 22 Sep 2023 10:23:56 +0800 Subject: [PATCH] fix(stream): add check info detailed information. --- include/libs/stream/tstream.h | 6 ++++++ source/dnode/snode/src/snode.c | 2 +- source/dnode/vnode/src/tq/tq.c | 30 +++++++++++++------------- source/libs/stream/src/streamRecover.c | 30 ++++++++++++++++++++------ 4 files changed, 45 insertions(+), 23 deletions(-) diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 65c869433b..d75cfde8d8 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -34,6 +34,12 @@ extern "C" { #define SIZE_IN_MiB(_v) ((_v) / ONE_MiB_F) #define SIZE_IN_KiB(_v) ((_v) / ONE_KiB_F) + +#define TASK_DOWNSTREAM_READY 0x0 +#define TASK_DOWNSTREAM_NOT_READY 0x1 +#define TASK_DOWNSTREAM_NOT_LEADER 0x2 +#define TASK_SELF_NEW_STAGE 0x3 + typedef struct SStreamTask SStreamTask; #define SSTREAM_TASK_VER 2 diff --git a/source/dnode/snode/src/snode.c b/source/dnode/snode/src/snode.c index ef9c1ebe2e..d6d751304e 100644 --- a/source/dnode/snode/src/snode.c +++ b/source/dnode/snode/src/snode.c @@ -361,7 +361,7 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) { qDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d", pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } else { - rsp.status = 0; + rsp.status = TASK_DOWNSTREAM_NOT_READY; qDebug("recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d", taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 12bc76a004..33be127940 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -889,22 +889,22 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) { if (!pMeta->leader) { tqError("s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg", taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId); - return -1; - } - - SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId); - if (pTask != NULL) { - rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage); - streamMetaReleaseTask(pMeta, pTask); - - const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); - tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d", - pTask->id.idStr, pStatus, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); + rsp.status = TASK_DOWNSTREAM_NOT_LEADER; } else { - rsp.status = 0; - tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64 - ") from task:0x%x (vgId:%d), rsp status %d", - req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); + SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId); + if (pTask != NULL) { + rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage); + streamMetaReleaseTask(pMeta, pTask); + + const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus); + tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d", + pTask->id.idStr, pStatus, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); + } else { + rsp.status = TASK_DOWNSTREAM_NOT_READY; + tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64 + ") from task:0x%x (vgId:%d), rsp status %d", + req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status); + } } return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId); diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 83d8fe0b2a..b3838677ce 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -214,7 +214,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ if (pInfo->stage == -1) { pInfo->stage = stage; - stDebug("s-task:%s receive check msg from upstream task:0x%x for the time, init stage value:%" PRId64, id, + stDebug("s-task:%s receive check msg from upstream task:0x%x first time, init stage value:%" PRId64, id, upstreamTaskId, stage); } @@ -223,7 +223,13 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_ id, upstreamTaskId, vgId, stage, pInfo->stage); } - return ((pTask->status.downstreamReady == 1) && (pInfo->stage == stage))? 1:0; + if (pTask->status.downstreamReady != 1) { + return TASK_DOWNSTREAM_NOT_READY; + } else if (pInfo->stage != stage) { + return TASK_SELF_NEW_STAGE; + } else { + return TASK_DOWNSTREAM_READY; + } } static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) { @@ -259,7 +265,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs ASSERT(pTask->id.taskId == pRsp->upstreamTaskId); const char* id = pTask->id.idStr; - if (pRsp->status == 1) { + if (pRsp->status == TASK_DOWNSTREAM_READY) { if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) { bool found = false; @@ -298,10 +304,20 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs doProcessDownstreamReadyRsp(pTask, 1); } } else { // not ready, wait for 100ms and retry - stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, wait for 100ms and retry", id, - pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage); - taosMsleep(100); - streamRecheckDownstream(pTask, pRsp); + if (pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) { + stError("s-task:%s downstream taskId:0x%x (vgId:%d) vnode-transfer/leader-change detected, roll-back needed not send check again", + id, pRsp->downstreamTaskId, pRsp->downstreamNodeId); + } else if (pRsp->status == TASK_SELF_NEW_STAGE) { + stError( + "s-task:%s vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, roll-back needed " + "and not send check again", + id, pRsp->oldStage, (int32_t) pTask->pMeta->stage); + } else { + stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, wait for 100ms and retry", id, + pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage); + taosMsleep(100); + streamRecheckDownstream(pTask, pRsp); + } } return 0;