fix(stream): handle the case when the task not in ready state, and the checkpoint msg arrived.

This commit is contained in:
Haojun Liao 2023-08-25 10:12:12 +08:00
parent f7ea875af1
commit ce1a3a379c
4 changed files with 45 additions and 23 deletions

View File

@ -526,6 +526,7 @@ typedef struct {
int32_t nodeId;
int32_t mnodeId;
int64_t expireTime;
int8_t success;
} SStreamCheckpointSourceRsp;
int32_t tEncodeStreamCheckpointSourceReq(SEncoder* pEncoder, const SStreamCheckpointSourceReq* pReq);
@ -726,8 +727,10 @@ int32_t streamTaskReleaseState(SStreamTask* pTask);
int32_t streamTaskReloadState(SStreamTask* pTask);
int32_t streamAlignTransferState(SStreamTask* pTask);
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SStreamTask* pTask);
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo,
SStreamTask* pTask, int8_t isSucceed);
int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
int8_t isSucceed);
#ifdef __cplusplus
}
#endif

View File

@ -1493,17 +1493,19 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t vgId = pTq->pStreamMeta->vgId;
int32_t taskId = htonl(pRsp->upstreamTaskId);
int64_t streamId = htobe64(pRsp->streamId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, streamId, taskId);
int32_t vgId = pTq->pStreamMeta->vgId;
pRsp->upstreamTaskId = htonl(pRsp->upstreamTaskId);
pRsp->streamId = htobe64(pRsp->streamId);
pRsp->downstreamTaskId = htonl(pRsp->downstreamTaskId);
pRsp->downstreamNodeId = htonl(pRsp->downstreamNodeId);
SStreamTask* pTask = streamMetaAcquireTask(pTq->pStreamMeta, pRsp->streamId, pRsp->upstreamTaskId);
if (pTask) {
streamProcessDispatchRsp(pTask, pRsp, pMsg->code);
streamMetaReleaseTask(pTq->pStreamMeta, pTask);
return TSDB_CODE_SUCCESS;
} else {
tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, taskId);
tqDebug("vgId:%d failed to handle the dispatch rsp, since find task:0x%x failed", vgId, pRsp->upstreamTaskId);
terrno = TSDB_CODE_STREAM_TASK_NOT_EXIST;
return terrno;
}
@ -1719,6 +1721,7 @@ FAIL:
int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; }
// todo error code cannot be return, since this is invoked by an mnode-launched transaction.
int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t vgId = TD_VID(pTq->pVnode);
SStreamMeta* pMeta = pTq->pStreamMeta;
@ -1738,7 +1741,6 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
}
tDecoderClear(&decoder);
// todo handle the case when the task not in ready state, and the checkpoint msg is arrived.
SStreamTask* pTask = streamMetaAcquireTask(pMeta, req.streamId, req.taskId);
if (pTask == NULL) {
tqError("vgId:%d failed to find s-task:0x%x, ignore checkpoint msg. it may have been destroyed already", vgId,
@ -1746,14 +1748,22 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
return TSDB_CODE_SUCCESS;
}
// downstream not ready, current the stream tasks are not all ready. Ignore this checkpoint req.
if (pTask->status.downstreamReady != 1) {
qError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64
", set it failure", pTask->id.idStr, req.checkpointId);
SRpcMsg rsp = {0};
buildCheckpointSourceRsp(&req, &pMsg->info, &rsp, 0);
tmsgSendRsp(&rsp); // error occurs
return TSDB_CODE_SUCCESS;
}
int32_t total = 0;
taosWLockLatch(&pMeta->lock);
// set the initial value for generating check point
// set the mgmt epset info according to the checkout source msg from mnode, todo opt perf
// pMeta->mgmtInfo.epset = req.mgmtEps;
// pMeta->mgmtInfo.mnodeId = req.mnodeId;
// set the mgmt epset info according to the checkout source msg from mnode, todo update mgmt epset if needed
if (pMeta->chkptNotReadyTasks == 0) {
pMeta->chkptNotReadyTasks = taosArrayGetSize(pMeta->pTaskList);
}
@ -1764,7 +1774,7 @@ int32_t tqProcessStreamCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg) {
qDebug("s-task:%s (vgId:%d) level:%d receive checkpoint-source msg, chkpt:%" PRId64 ", total checkpoint req:%d",
pTask->id.idStr, vgId, pTask->info.taskLevel, req.checkpointId, total);
code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask);
code = streamAddCheckpointSourceRspMsg(&req, &pMsg->info, pTask, 1);
if (code != TSDB_CODE_SUCCESS) {
return code;
}

View File

@ -48,6 +48,7 @@ int32_t tEncodeStreamCheckpointSourceRsp(SEncoder* pEncoder, const SStreamCheckp
if (tEncodeI32(pEncoder, pRsp->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pRsp->nodeId) < 0) return -1;
if (tEncodeI64(pEncoder, pRsp->expireTime) < 0) return -1;
if (tEncodeI8(pEncoder, pRsp->success) < 0) return -1;
tEndEncode(pEncoder);
return pEncoder->pos;
}
@ -59,6 +60,7 @@ int32_t tDecodeStreamCheckpointSourceRsp(SDecoder* pDecoder, SStreamCheckpointSo
if (tDecodeI32(pDecoder, &pRsp->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pRsp->nodeId) < 0) return -1;
if (tDecodeI64(pDecoder, &pRsp->expireTime) < 0) return -1;
if (tDecodeI8(pDecoder, &pRsp->success) < 0) return -1;
tEndDecode(pDecoder);
return 0;
}

View File

@ -28,9 +28,9 @@ typedef struct SBlockName {
} SBlockName;
typedef struct {
int32_t taskId;
int32_t upStreamTaskId;
SEpSet upstreamNodeEpset;
SRpcMsg msg;
SEpSet epset;
} SStreamChkptReadyInfo;
static void doRetryDispatchData(void* param, void* tmrId);
@ -587,9 +587,10 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
for (int32_t i = 0; i < num; ++i) {
SStreamChkptReadyInfo* pInfo = taosArrayGet(pTask->pReadyMsgList, i);
tmsgSendReq(&pInfo->epset, &pInfo->msg);
tmsgSendReq(&pInfo->upstreamNodeEpset, &pInfo->msg);
qDebug("s-task:%s level:%d checkpoint ready msg sent to upstream:0x%x", pTask->id.idStr, pTask->info.taskLevel,
pInfo->taskId);
pInfo->upStreamTaskId);
}
taosArrayClear(pTask->pReadyMsgList);
@ -724,8 +725,8 @@ FAIL:
return code;
}
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo,
SStreamTask* pTask) {
int32_t buildCheckpointSourceRsp(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo, SRpcMsg* pMsg,
int8_t isSucceed) {
int32_t len = 0;
int32_t code = 0;
SEncoder encoder;
@ -737,6 +738,7 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
.streamId = pReq->streamId,
.expireTime = pReq->expireTime,
.mnodeId = pReq->mnodeId,
.success = isSucceed,
};
tEncodeSize(tEncodeStreamCheckpointSourceRsp, &rsp, len, code);
@ -757,16 +759,21 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
tEncodeStreamCheckpointSourceRsp(&encoder, &rsp);
tEncoderClear(&encoder);
initRpcMsg(pMsg, 0, pBuf, sizeof(SMsgHead) + len);
pMsg->info = *pRpcInfo;
return 0;
}
int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHandleInfo* pRpcInfo,
SStreamTask* pTask, int8_t isSucceed) {
SStreamChkptReadyInfo info = {0};
initRpcMsg(&info.msg, 0, pBuf, sizeof(SMsgHead) + len);
info.msg.info = *pRpcInfo;
buildCheckpointSourceRsp(pReq, pRpcInfo, &info.msg, isSucceed);
if (pTask->pReadyMsgList == NULL) {
pTask->pReadyMsgList = taosArrayInit(4, sizeof(SStreamChkptReadyInfo));
}
taosArrayPush(pTask->pReadyMsgList, &info);
qDebug("s-task:%s add checkpoint source rsp msg, total:%d", pTask->id.idStr, (int32_t)taosArrayGetSize(pTask->pReadyMsgList));
return TSDB_CODE_SUCCESS;
}
@ -813,7 +820,7 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
ASSERT(req.upstreamTaskId != 0);
SStreamChkptReadyInfo info = {.taskId = pInfo->taskId, .epset = pInfo->epSet};
SStreamChkptReadyInfo info = {.upStreamTaskId = pInfo->taskId, .upstreamNodeEpset = pInfo->epSet};
initRpcMsg(&info.msg, TDMT_STREAM_TASK_CHECKPOINT_READY, buf, tlen + sizeof(SMsgHead));
info.msg.info.noResp = 1; // refactor later.