fix(stream): send checkpoint-source-rsp to mnode before reset task status.
This commit is contained in:
parent
653f7a1a43
commit
47b0a0464e
|
@ -801,6 +801,7 @@ int32_t streamTaskBroadcastRetrieveReq(SStreamTask* pTask, SStreamRetrieveReq* r
|
||||||
void streamTaskSendRetrieveRsp(SStreamRetrieveReq* pReq, SRpcMsg* pRsp);
|
void streamTaskSendRetrieveRsp(SStreamRetrieveReq* pReq, SRpcMsg* pRsp);
|
||||||
|
|
||||||
int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp);
|
int32_t streamProcessHeartbeatRsp(SStreamMeta* pMeta, SMStreamHbRspMsg* pRsp);
|
||||||
|
int32_t streamTaskSendPreparedCheckpointsourceRsp(SStreamTask* pTask);
|
||||||
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -1239,9 +1239,6 @@ static int32_t mndProcessStreamCheckpoint(SRpcMsg *pReq) {
|
||||||
code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true);
|
code = mndProcessStreamCheckpointTrans(pMnode, p, checkpointId, 1, true);
|
||||||
sdbRelease(pSdb, p);
|
sdbRelease(pSdb, p);
|
||||||
|
|
||||||
// clear the consensus checkpoint info
|
|
||||||
mndClearConsensusCheckpointId(execInfo.pStreamConsensus, p->uid);
|
|
||||||
|
|
||||||
if (code != -1) {
|
if (code != -1) {
|
||||||
started += 1;
|
started += 1;
|
||||||
|
|
||||||
|
|
|
@ -228,6 +228,9 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
||||||
}
|
}
|
||||||
|
|
||||||
updated = streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
|
updated = streamTaskUpdateEpsetInfo(pTask, req.pNodeList);
|
||||||
|
|
||||||
|
// send the checkpoint-source-rsp for source task to end the checkpoint trans in mnode
|
||||||
|
streamTaskSendPreparedCheckpointsourceRsp(pTask);
|
||||||
streamTaskResetStatus(pTask);
|
streamTaskResetStatus(pTask);
|
||||||
|
|
||||||
streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
|
streamTaskStopMonitorCheckRsp(&pTask->taskCheckInfo, pTask->id.idStr);
|
||||||
|
@ -264,7 +267,6 @@ int32_t tqStreamTaskProcessUpdateReq(SStreamMeta* pMeta, SMsgCb* cb, SRpcMsg* pM
|
||||||
tqDebug("s-task:%s vgId:%d not save task since not update epset actually, stop task", idstr, vgId);
|
tqDebug("s-task:%s vgId:%d not save task since not update epset actually, stop task", idstr, vgId);
|
||||||
}
|
}
|
||||||
|
|
||||||
// stop
|
|
||||||
streamTaskStop(pTask);
|
streamTaskStop(pTask);
|
||||||
if (ppHTask != NULL) {
|
if (ppHTask != NULL) {
|
||||||
streamTaskStop(*ppHTask);
|
streamTaskStop(*ppHTask);
|
||||||
|
|
|
@ -1146,4 +1146,20 @@ int32_t streamTaskSendRestoreChkptMsg(SStreamTask* pTask) {
|
||||||
|
|
||||||
tmsgSendReq(&pTask->info.mnodeEpset, &msg);
|
tmsgSendReq(&pTask->info.mnodeEpset, &msg);
|
||||||
return 0;
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t streamTaskSendPreparedCheckpointsourceRsp(SStreamTask* pTask) {
|
||||||
|
int32_t code = 0;
|
||||||
|
if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
taosThreadMutexLock(&pTask->lock);
|
||||||
|
SStreamTaskState* p = streamTaskGetStatus(pTask);
|
||||||
|
if (p->state == TASK_STATUS__CK) {
|
||||||
|
code = streamTaskSendCheckpointSourceRsp(pTask);
|
||||||
|
}
|
||||||
|
taosThreadMutexUnlock(&pTask->lock);
|
||||||
|
|
||||||
|
return code;
|
||||||
}
|
}
|
Loading…
Reference in New Issue