fix(stream): rsp when checkpoint source failed.

This commit is contained in:
Haojun Liao 2023-09-23 14:13:33 +08:00
parent 7d5f76c6f1
commit acd3321494
4 changed files with 22 additions and 4 deletions

View File

@ -75,6 +75,7 @@ enum {
TASK_INPUT_STATUS__NORMAL = 1, TASK_INPUT_STATUS__NORMAL = 1,
TASK_INPUT_STATUS__BLOCKED, TASK_INPUT_STATUS__BLOCKED,
TASK_INPUT_STATUS__FAILED, TASK_INPUT_STATUS__FAILED,
TASK_INPUT_STATUS__REFUSED,
}; };
enum { enum {

View File

@ -1616,6 +1616,9 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRs
SStreamCheckpointSourceReq req = {0}; SStreamCheckpointSourceReq req = {0};
if (!vnodeIsRoleLeader(pTq->pVnode)) { if (!vnodeIsRoleLeader(pTq->pVnode)) {
tqDebug("vgId:%d not leader, ignore checkpoint-source msg", vgId); tqDebug("vgId:%d not leader, ignore checkpoint-source msg", vgId);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1625,6 +1628,9 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRs
code = TSDB_CODE_MSG_DECODE_ERROR; code = TSDB_CODE_MSG_DECODE_ERROR;
tDecoderClear(&decoder); tDecoderClear(&decoder);
tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code)); tqError("vgId:%d failed to decode checkpoint-source msg, code:%s", vgId, tstrerror(code));
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
tmsgSendRsp(&rsp); // error occurs
return code; return code;
} }
tDecoderClear(&decoder); tDecoderClear(&decoder);
@ -1633,6 +1639,9 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRs
if (pTask == NULL) { if (pTask == NULL) {
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId, tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId,
req.taskId); req.taskId);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -1681,6 +1690,9 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRs
code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask, 1); code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask, 1);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
tmsgSendRsp(&rsp); // error occurs
return code; return code;
} }

View File

@ -225,15 +225,15 @@ int32_t streamProcessDispatchMsg(SStreamTask* pTask, SStreamDispatchReq* pReq, S
ASSERT(pInfo != NULL); ASSERT(pInfo != NULL);
if (!pTask->pMeta->leader) { if (!pTask->pMeta->leader) {
stError("s-task:%s task on follower received dispatch msgs, should discard it, not now", id); stError("s-task:%s task on follower received dispatch msgs, dispatch msg rejected", id);
status = TASK_INPUT_STATUS__BLOCKED; status = TASK_INPUT_STATUS__REFUSED;
} else { } else {
if (pReq->stage > pInfo->stage) { if (pReq->stage > pInfo->stage) {
// upstream task has restarted/leader-follower switch/transferred to other dnodes // upstream task has restarted/leader-follower switch/transferred to other dnodes
stError("s-task:%s upstream task:0x%x (vgId:%d) has restart/leader-switch/vnode-transfer, prev stage:%" PRId64 stError("s-task:%s upstream task:0x%x (vgId:%d) has restart/leader-switch/vnode-transfer, prev stage:%" PRId64
", current:%" PRId64 " dispatch msg rejected", ", current:%" PRId64 " dispatch msg rejected",
id, pReq->upstreamTaskId, pReq->upstreamNodeId, pInfo->stage, pReq->stage); id, pReq->upstreamTaskId, pReq->upstreamNodeId, pInfo->stage, pReq->stage);
status = TASK_INPUT_STATUS__BLOCKED; status = TASK_INPUT_STATUS__REFUSED;
} else { } else {
if (!pInfo->dataAllowed) { if (!pInfo->dataAllowed) {
stWarn("s-task:%s data from task:0x%x is denied, since inputQ is closed for it", id, pReq->upstreamTaskId); stWarn("s-task:%s data from task:0x%x is denied, since inputQ is closed for it", id, pReq->upstreamTaskId);

View File

@ -1046,7 +1046,9 @@ static int32_t handleDispatchSuccessRsp(SStreamTask* pTask, int32_t downstreamId
pTask->id.idStr, downstreamId, el); pTask->id.idStr, downstreamId, el);
// put data into inputQ of current task is also allowed // put data into inputQ of current task is also allowed
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL; if (pTask->inputInfo.status == TASK_INPUT_STATUS__BLOCKED) {
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
}
// now ready for next data output // now ready for next data output
atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL); atomic_store_8(&pTask->outputInfo.status, TASK_OUTPUT_STATUS__NORMAL);
@ -1105,6 +1107,9 @@ int32_t streamProcessDispatchRsp(SStreamTask* pTask, SStreamDispatchRsp* pRsp, i
stError("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch data", id, stError("s-task:%s inputQ of downstream task:0x%x(vgId:%d) is full, wait for %dms and retry dispatch data", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId, DISPATCH_RETRY_INTERVAL_MS); pRsp->downstreamTaskId, pRsp->downstreamNodeId, DISPATCH_RETRY_INTERVAL_MS);
} else if (pRsp->inputStatus == TASK_INPUT_STATUS__REFUSED) {
stError("s-task:%s downstream task:0x%x(vgId:%d) refused the dispatch msg, treat it as success", id,
pRsp->downstreamTaskId, pRsp->downstreamNodeId);
} }
// transtate msg has been sent to downstream successfully. let's transfer the fill-history task state // transtate msg has been sent to downstream successfully. let's transfer the fill-history task state