fix(stream): set correct error code and open inputq for upstream .

This commit is contained in:
Haojun Liao 2024-11-13 16:41:15 +08:00
parent c9a120e1d0
commit b88a786fdf
2 changed files with 10 additions and 8 deletions

View File

@ -255,8 +255,7 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check
"the interrupted checkpoint", "the interrupted checkpoint",
id, vgId, pBlock->srcTaskId); id, vgId, pBlock->srcTaskId);
streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId); return TSDB_CODE_STREAM_INVLD_CHKPT;
return code;
} }
if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) { if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) {
@ -264,14 +263,14 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check
stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64 stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64
" discard", " discard",
id, vgId, pActiveInfo->activeId, checkpointId); id, vgId, pActiveInfo->activeId, checkpointId);
return code; return TSDB_CODE_STREAM_INVLD_CHKPT;
} else { // checkpointId == pActiveInfo->activeId } else { // checkpointId == pActiveInfo->activeId
if (pActiveInfo->allUpstreamTriggerRecv == 1) { if (pActiveInfo->allUpstreamTriggerRecv == 1) {
stDebug( stDebug(
"s-task:%s vgId:%d all upstream checkpoint-trigger recv, discard this checkpoint-trigger, " "s-task:%s vgId:%d all upstream checkpoint-trigger recv, discard this checkpoint-trigger, "
"checkpointId:%" PRId64 " transId:%d", "checkpointId:%" PRId64 " transId:%d",
id, vgId, checkpointId, transId); id, vgId, checkpointId, transId);
return code; return TSDB_CODE_STREAM_INVLD_CHKPT;
} }
if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) { if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
@ -283,17 +282,17 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check
} }
if (p->upstreamTaskId == pBlock->srcTaskId) { if (p->upstreamTaskId == pBlock->srcTaskId) {
stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64 stWarn("s-task:%s repeatly recv checkpoint-trigger msg from task:0x%x vgId:%d, checkpointId:%" PRId64
", prev recvTs:%" PRId64 " discard", ", prev recvTs:%" PRId64 " discard",
pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs); pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs);
return code; return TSDB_CODE_STREAM_INVLD_CHKPT;
} }
} }
} }
} }
} }
return 0; return TSDB_CODE_SUCCESS;
} }
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) { int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
@ -317,6 +316,9 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
code = doCheckBeforeHandleChkptTrigger(pTask, checkpointId, pBlock, transId); code = doCheckBeforeHandleChkptTrigger(pTask, checkpointId, pBlock, transId);
streamMutexUnlock(&pTask->lock); streamMutexUnlock(&pTask->lock);
if (code) { if (code) {
if (taskLevel != TASK_LEVEL__SOURCE) { // the checkpoint-trigger is discard, open the inputQ for upstream tasks
streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId);
}
streamFreeQitem((SStreamQueueItem*)pBlock); streamFreeQitem((SStreamQueueItem*)pBlock);
return code; return code;
} }