fix(stream): add check info detailed information.

This commit is contained in:
Haojun Liao 2023-09-22 10:23:56 +08:00
parent b1ba716299
commit f7bcff862d
4 changed files with 45 additions and 23 deletions

View File

@ -34,6 +34,12 @@ extern "C" {
#define SIZE_IN_MiB(_v) ((_v) / ONE_MiB_F)
#define SIZE_IN_KiB(_v) ((_v) / ONE_KiB_F)
#define TASK_DOWNSTREAM_READY 0x0
#define TASK_DOWNSTREAM_NOT_READY 0x1
#define TASK_DOWNSTREAM_NOT_LEADER 0x2
#define TASK_SELF_NEW_STAGE 0x3
typedef struct SStreamTask SStreamTask;
#define SSTREAM_TASK_VER 2

View File

@ -361,7 +361,7 @@ int32_t sndProcessStreamTaskCheckReq(SSnode *pSnode, SRpcMsg *pMsg) {
qDebug("s-task:%s status:%s, recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d",
pTask->id.idStr, pStatus, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
} else {
rsp.status = 0;
rsp.status = TASK_DOWNSTREAM_NOT_READY;
qDebug("recv task check(taskId:0x%x not built yet) req(reqId:0x%" PRIx64 ") from task:0x%x (vgId:%d), rsp status %d",
taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
}

View File

@ -889,22 +889,22 @@ int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg) {
if (!pMeta->leader) {
tqError("s-task:0x%x invalid check msg from upstream:0x%x(vgId:%d), vgId:%d is follower, not handle check status msg",
taskId, req.upstreamTaskId, req.upstreamNodeId, pMeta->vgId);
return -1;
}
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId);
if (pTask != NULL) {
rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage);
streamMetaReleaseTask(pMeta, pTask);
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d",
pTask->id.idStr, pStatus, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
rsp.status = TASK_DOWNSTREAM_NOT_LEADER;
} else {
rsp.status = 0;
tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64
") from task:0x%x (vgId:%d), rsp status %d",
req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, taskId);
if (pTask != NULL) {
rsp.status = streamTaskCheckStatus(pTask, req.upstreamTaskId, req.upstreamNodeId, req.stage);
streamMetaReleaseTask(pMeta, pTask);
const char* pStatus = streamGetTaskStatusStr(pTask->status.taskStatus);
tqDebug("s-task:%s status:%s, stage:%d recv task check req(reqId:0x%" PRIx64 ") task:0x%x (vgId:%d), ready:%d",
pTask->id.idStr, pStatus, rsp.oldStage, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
} else {
rsp.status = TASK_DOWNSTREAM_NOT_READY;
tqDebug("tq recv task check(taskId:0x%" PRIx64 "-0x%x not built yet) req(reqId:0x%" PRIx64
") from task:0x%x (vgId:%d), rsp status %d",
req.streamId, taskId, rsp.reqId, rsp.upstreamTaskId, rsp.upstreamNodeId, rsp.status);
}
}
return streamSendCheckRsp(pMeta, &req, &rsp, &pMsg->info, taskId);

View File

@ -214,7 +214,7 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
if (pInfo->stage == -1) {
pInfo->stage = stage;
stDebug("s-task:%s receive check msg from upstream task:0x%x for the time, init stage value:%" PRId64, id,
stDebug("s-task:%s receive check msg from upstream task:0x%x first time, init stage value:%" PRId64, id,
upstreamTaskId, stage);
}
@ -223,7 +223,13 @@ int32_t streamTaskCheckStatus(SStreamTask* pTask, int32_t upstreamTaskId, int32_
id, upstreamTaskId, vgId, stage, pInfo->stage);
}
return ((pTask->status.downstreamReady == 1) && (pInfo->stage == stage))? 1:0;
if (pTask->status.downstreamReady != 1) {
return TASK_DOWNSTREAM_NOT_READY;
} else if (pInfo->stage != stage) {
return TASK_SELF_NEW_STAGE;
} else {
return TASK_DOWNSTREAM_READY;
}
}
static void doProcessDownstreamReadyRsp(SStreamTask* pTask, int32_t numOfReqs) {
@ -259,7 +265,7 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
ASSERT(pTask->id.taskId == pRsp->upstreamTaskId);
const char* id = pTask->id.idStr;
if (pRsp->status == 1) {
if (pRsp->status == TASK_DOWNSTREAM_READY) {
if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
bool found = false;
@ -298,10 +304,20 @@ int32_t streamProcessCheckRsp(SStreamTask* pTask, const SStreamTaskCheckRsp* pRs
doProcessDownstreamReadyRsp(pTask, 1);
}
} else { // not ready, wait for 100ms and retry
stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, wait for 100ms and retry", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage);
taosMsleep(100);
streamRecheckDownstream(pTask, pRsp);
if (pRsp->status == TASK_DOWNSTREAM_NOT_LEADER) {
stError("s-task:%s downstream taskId:0x%x (vgId:%d) vnode-transfer/leader-change detected, roll-back needed not send check again",
id, pRsp->downstreamTaskId, pRsp->downstreamNodeId);
} else if (pRsp->status == TASK_SELF_NEW_STAGE) {
stError(
"s-task:%s vnode-transfer/leader-change/restart detected, old stage:%d, current stage:%d, roll-back needed "
"and not send check again",
id, pRsp->oldStage, (int32_t) pTask->pMeta->stage);
} else {
stDebug("s-task:%s downstream taskId:0x%x (vgId:%d) not ready, stage:%d, wait for 100ms and retry", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, pRsp->oldStage);
taosMsleep(100);
streamRecheckDownstream(pTask, pRsp);
}
}
return 0;