fix(stream): set correct flag for checkpoint.

This commit is contained in:
Haojun Liao 2024-06-22 11:16:20 +08:00
parent 095510ba41
commit 21e1763ff4
4 changed files with 11 additions and 10 deletions

View File

@ -194,7 +194,7 @@ void streamTaskSetRetryInfoForLaunch(SHistoryTaskInfo* pInfo);
int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask);
void streamTaskClearActiveInfo(SActiveCheckpointInfo* pInfo);
void streamClearChkptReadyMsg(SStreamTask* pTask);
void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo);
EExtractDataCode streamTaskGetDataFromInputQ(SStreamTask* pTask, SStreamQueueItem** pInput, int32_t* numOfBlocks,
int32_t* blockSize);
int32_t streamQueueItemGetSize(const SStreamQueueItem* pItem);

View File

@ -405,7 +405,7 @@ void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
streamTaskClearActiveInfo(pTask->chkInfo.pActiveInfo);
streamTaskOpenAllUpstreamInput(pTask); // open inputQ for all upstream tasks
if (clearChkpReadyMsg) {
streamClearChkptReadyMsg(pTask);
streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);
}
}
@ -696,14 +696,16 @@ int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
// TODO: monitoring the checkpoint-report msg
// update the latest checkpoint info if all works are done successfully, for rsma, the pMsgCb is null.
if (code == TSDB_CODE_SUCCESS && (pTask->pMsgCb != NULL)) {
code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask);
} else { // clear the checkpoint info if failed
if (code == TSDB_CODE_SUCCESS) {
if (pTask->pMsgCb != NULL) {
code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask);
}
} else { // clear the checkpoint info if failed
taosThreadMutexLock(&pTask->lock);
streamTaskClearCheckInfo(pTask, false);
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
taosThreadMutexUnlock(&pTask->lock);
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
streamTaskSetFailedCheckpointId(pTask);
stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId);
}

View File

@ -856,7 +856,7 @@ static void checkpointReadyMsgSendMonitorFn(void* param, void* tmrId) {
"and quit from timer, ref:%d",
id, vgId, ref);
streamClearChkptReadyMsg(pTask);
streamClearChkptReadyMsg(pActiveInfo);
taosThreadMutexUnlock(&pActiveInfo->lock);
streamMetaReleaseTask(pTask->pMeta, pTask);
}
@ -1128,8 +1128,7 @@ int32_t streamAddCheckpointReadyMsg(SStreamTask* pTask, int32_t upstreamTaskId,
return 0;
}
void streamClearChkptReadyMsg(SStreamTask* pTask) {
SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
void streamClearChkptReadyMsg(SActiveCheckpointInfo* pActiveInfo) {
if (pActiveInfo == NULL) {
return;
}

View File

@ -254,7 +254,7 @@ void tFreeStreamTask(SStreamTask* pTask) {
walCloseReader(pTask->exec.pWalReader);
}
streamClearChkptReadyMsg(pTask);
streamClearChkptReadyMsg(pTask->chkInfo.pActiveInfo);
if (pTask->msgInfo.pData != NULL) {
clearBufferedDispatchMsg(pTask);