fix(stream): set correct value and open inputQ for stream if reset checkpoint.
This commit is contained in:
parent
67a547f65a
commit
9cc2aec9d6
|
@ -2434,7 +2434,12 @@ static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SC
|
||||||
mDebug("s-task:0x%x expired checkpoint-report msg in checkpoint-report list update from %" PRId64 "->%" PRId64,
|
mDebug("s-task:0x%x expired checkpoint-report msg in checkpoint-report list update from %" PRId64 "->%" PRId64,
|
||||||
pReport->taskId, p->checkpointId, pReport->checkpointId);
|
pReport->taskId, p->checkpointId, pReport->checkpointId);
|
||||||
|
|
||||||
memcpy(p, pReport, sizeof(STaskChkptInfo));
|
// update the checkpoint report info
|
||||||
|
p->checkpointId = pReport->checkpointId;
|
||||||
|
p->ts = pReport->checkpointTs;
|
||||||
|
p->version = pReport->checkpointVer;
|
||||||
|
p->transId = pReport->transId;
|
||||||
|
p->dropHTask = pReport->dropHTask;
|
||||||
} else {
|
} else {
|
||||||
mWarn("taskId:0x%x already in checkpoint-report list", pReport->taskId);
|
mWarn("taskId:0x%x already in checkpoint-report list", pReport->taskId);
|
||||||
}
|
}
|
||||||
|
|
|
@ -255,8 +255,7 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check
|
||||||
"the interrupted checkpoint",
|
"the interrupted checkpoint",
|
||||||
id, vgId, pBlock->srcTaskId);
|
id, vgId, pBlock->srcTaskId);
|
||||||
|
|
||||||
streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId);
|
return TSDB_CODE_STREAM_INVLD_CHKPT;
|
||||||
return code;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) {
|
if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) {
|
||||||
|
@ -264,14 +263,14 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check
|
||||||
stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64
|
stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64
|
||||||
" discard",
|
" discard",
|
||||||
id, vgId, pActiveInfo->activeId, checkpointId);
|
id, vgId, pActiveInfo->activeId, checkpointId);
|
||||||
return code;
|
return TSDB_CODE_STREAM_INVLD_CHKPT;
|
||||||
} else { // checkpointId == pActiveInfo->activeId
|
} else { // checkpointId == pActiveInfo->activeId
|
||||||
if (pActiveInfo->allUpstreamTriggerRecv == 1) {
|
if (pActiveInfo->allUpstreamTriggerRecv == 1) {
|
||||||
stDebug(
|
stDebug(
|
||||||
"s-task:%s vgId:%d all upstream checkpoint-trigger recv, discard this checkpoint-trigger, "
|
"s-task:%s vgId:%d all upstream checkpoint-trigger recv, discard this checkpoint-trigger, "
|
||||||
"checkpointId:%" PRId64 " transId:%d",
|
"checkpointId:%" PRId64 " transId:%d",
|
||||||
id, vgId, checkpointId, transId);
|
id, vgId, checkpointId, transId);
|
||||||
return code;
|
return TSDB_CODE_STREAM_INVLD_CHKPT;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
|
if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
|
||||||
|
@ -286,14 +285,14 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check
|
||||||
stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64
|
stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64
|
||||||
", prev recvTs:%" PRId64 " discard",
|
", prev recvTs:%" PRId64 " discard",
|
||||||
pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs);
|
pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs);
|
||||||
return code;
|
return TSDB_CODE_STREAM_INVLD_CHKPT;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
|
||||||
|
@ -317,6 +316,10 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
|
||||||
code = doCheckBeforeHandleChkptTrigger(pTask, checkpointId, pBlock, transId);
|
code = doCheckBeforeHandleChkptTrigger(pTask, checkpointId, pBlock, transId);
|
||||||
streamMutexUnlock(&pTask->lock);
|
streamMutexUnlock(&pTask->lock);
|
||||||
if (code) {
|
if (code) {
|
||||||
|
if (taskLevel != TASK_LEVEL__SOURCE) { // the checkpoint-trigger is discard, open the inputQ for upstream tasks
|
||||||
|
streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId);
|
||||||
|
}
|
||||||
|
|
||||||
streamFreeQitem((SStreamQueueItem*)pBlock);
|
streamFreeQitem((SStreamQueueItem*)pBlock);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue