Merge pull request #28795 from taosdata/fix/liaohj

fix(stream): set correct value and open inputQ for stream if reset checkpoint.
This commit is contained in:
Shengliang Guan 2024-11-18 09:37:49 +08:00 committed by GitHub
commit 9364d314eb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 27 additions and 10 deletions

View File

@ -1011,6 +1011,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_STREAM_CONFLICT_EVENT TAOS_DEF_ERROR_CODE(0, 0x4106)
#define TSDB_CODE_STREAM_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x4107)
#define TSDB_CODE_STREAM_INPUTQ_FULL TAOS_DEF_ERROR_CODE(0, 0x4108)
#define TSDB_CODE_STREAM_INVLD_CHKPT TAOS_DEF_ERROR_CODE(0, 0x4109)
// TDLite
#define TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS TAOS_DEF_ERROR_CODE(0, 0x5100)

View File

@ -2434,7 +2434,12 @@ static void doAddReportStreamTask(SArray *pList, int64_t reportChkptId, const SC
mDebug("s-task:0x%x expired checkpoint-report msg in checkpoint-report list update from %" PRId64 "->%" PRId64,
pReport->taskId, p->checkpointId, pReport->checkpointId);
memcpy(p, pReport, sizeof(STaskChkptInfo));
// update the checkpoint report info
p->checkpointId = pReport->checkpointId;
p->ts = pReport->checkpointTs;
p->version = pReport->checkpointVer;
p->transId = pReport->transId;
p->dropHTask = pReport->dropHTask;
} else {
mWarn("taskId:0x%x already in checkpoint-report list", pReport->taskId);
}

View File

@ -222,14 +222,14 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check
stError("s-task:%s vgId:%d current checkpointId:%" PRId64
" recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard",
id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId);
return code;
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
if (pActiveInfo->failedId >= checkpointId) {
stError("s-task:%s vgId:%d checkpointId:%" PRId64 " transId:%d, has been marked failed, failedId:%" PRId64
" discard the checkpoint-trigger block",
id, vgId, checkpointId, transId, pActiveInfo->failedId);
return code;
return TSDB_CODE_STREAM_TASK_NOT_EXIST;
}
if (pTask->chkInfo.checkpointId == checkpointId) {
@ -255,8 +255,7 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check
"the interrupted checkpoint",
id, vgId, pBlock->srcTaskId);
streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId);
return code;
return TSDB_CODE_STREAM_INVLD_CHKPT;
}
if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) {
@ -264,14 +263,14 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check
stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64
" discard",
id, vgId, pActiveInfo->activeId, checkpointId);
return code;
return TSDB_CODE_STREAM_INVLD_CHKPT;
} else { // checkpointId == pActiveInfo->activeId
if (pActiveInfo->allUpstreamTriggerRecv == 1) {
stDebug(
"s-task:%s vgId:%d all upstream checkpoint-trigger recv, discard this checkpoint-trigger, "
"checkpointId:%" PRId64 " transId:%d",
id, vgId, checkpointId, transId);
return code;
return TSDB_CODE_STREAM_INVLD_CHKPT;
}
if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
@ -286,14 +285,14 @@ static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t check
stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64
", prev recvTs:%" PRId64 " discard",
pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs);
return code;
return TSDB_CODE_STREAM_INVLD_CHKPT;
}
}
}
}
}
return 0;
return TSDB_CODE_SUCCESS;
}
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
@ -317,6 +316,10 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
code = doCheckBeforeHandleChkptTrigger(pTask, checkpointId, pBlock, transId);
streamMutexUnlock(&pTask->lock);
if (code) {
if (taskLevel != TASK_LEVEL__SOURCE) { // the checkpoint-trigger is discard, open the inputQ for upstream tasks
streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId);
}
streamFreeQitem((SStreamQueueItem*)pBlock);
return code;
}
@ -359,6 +362,10 @@ int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock
}
}
#if 0
taosMsleep(20*1000);
#endif
if (taskLevel == TASK_LEVEL__SOURCE) {
int8_t type = pTask->outputInfo.type;
pActiveInfo->allUpstreamTriggerRecv = 1;

View File

@ -1170,6 +1170,7 @@ int32_t streamTaskSendCheckpointReadyMsg(SStreamTask* pTask) {
if (taosArrayGetSize(pTask->upstreamInfo.pList) != num) {
stError("s-task:%s invalid number of sent readyMsg:%d to upstream:%d", id, num,
(int32_t)taosArrayGetSize(pTask->upstreamInfo.pList));
streamMutexUnlock(&pActiveInfo->lock);
return TSDB_CODE_STREAM_INTERNAL_ERROR;
}
@ -1412,6 +1413,7 @@ int32_t streamAddCheckpointSourceRspMsg(SStreamCheckpointSourceReq* pReq, SRpcHa
if (size > 0) {
STaskCheckpointReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, 0);
if (pReady == NULL) {
streamMutexUnlock(&pActiveInfo->lock);
return terrno;
}

View File

@ -433,6 +433,7 @@ int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
// send hb msg to mnode before closing all tasks.
int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList);
if (code != TSDB_CODE_SUCCESS) {
streamMetaRUnLock(pMeta);
return code;
}

View File

@ -853,7 +853,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_IVLD_STATUS, "Invalid task status
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_CONFLICT_EVENT, "Stream conflict event")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INTERNAL_ERROR, "Stream internal error")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_NOT_LEADER, "Stream task not on leader vnode")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INPUTQ_FULL, "Task input queue is full")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INPUTQ_FULL, "Task input queue is full")
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_INVLD_CHKPT, "Invalid checkpoint trigger msg")
// TDLite
TAOS_DEFINE_ERROR(TSDB_CODE_TDLITE_IVLD_OPEN_FLAGS, "Invalid TDLite open flags")