refactor: do some internal refactor.
This commit is contained in:
parent
25f798e42f
commit
30498a4028
|
@ -1259,10 +1259,11 @@ int32_t tqProcessTaskCheckPointSourceReq(STQ* pTq, SRpcMsg* pMsg, SRpcMsg* pRsp)
|
||||||
if (pTask->status.downstreamReady != 1) {
|
if (pTask->status.downstreamReady != 1) {
|
||||||
pTask->chkInfo.failedId = req.checkpointId; // record the latest failed checkpoint id
|
pTask->chkInfo.failedId = req.checkpointId; // record the latest failed checkpoint id
|
||||||
pTask->chkInfo.checkpointingId = req.checkpointId;
|
pTask->chkInfo.checkpointingId = req.checkpointId;
|
||||||
|
pTask->chkInfo.transId = req.transId;
|
||||||
|
|
||||||
tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64
|
tqError("s-task:%s not ready for checkpoint, since downstream not ready, ignore this checkpoint:%" PRId64
|
||||||
", set it failure",
|
", transId:%d set it failed",
|
||||||
pTask->id.idStr, req.checkpointId);
|
pTask->id.idStr, req.checkpointId, req.transId);
|
||||||
streamMetaReleaseTask(pMeta, pTask);
|
streamMetaReleaseTask(pMeta, pTask);
|
||||||
|
|
||||||
SRpcMsg rsp = {0};
|
SRpcMsg rsp = {0};
|
||||||
|
|
|
@ -1102,6 +1102,7 @@ static int32_t metaHeartbeatToMnodeImpl(SStreamMeta* pMeta) {
|
||||||
entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId);
|
entry.checkpointFailed = ((*pTask)->chkInfo.failedId >= (*pTask)->chkInfo.checkpointingId);
|
||||||
entry.activeCheckpointId = (*pTask)->chkInfo.checkpointingId;
|
entry.activeCheckpointId = (*pTask)->chkInfo.checkpointingId;
|
||||||
entry.chkpointTransId = (*pTask)->chkInfo.transId;
|
entry.chkpointTransId = (*pTask)->chkInfo.transId;
|
||||||
|
stInfo("s-task:%s send kill checkpoint trans info, transId:%d", (*pTask)->id.idStr, (*pTask)->chkInfo.transId);
|
||||||
}
|
}
|
||||||
|
|
||||||
if ((*pTask)->exec.pWalReader != NULL) {
|
if ((*pTask)->exec.pWalReader != NULL) {
|
||||||
|
|
Loading…
Reference in New Issue