diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index 4bf74d8d4f..9be8f5ffaa 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -561,12 +561,14 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d", id, vgId, pReq->taskId, numOfTasks); } + streamMetaWLock(pMeta); - if (streamMetaCommit(pMeta) < 0) { - // persist to disk + if (pReq->dropRelHTask) { + code = streamMetaCommit(pMeta); } } + // always return true return TSDB_CODE_SUCCESS; } @@ -594,13 +596,15 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer && pInfo->processedVer <= pReq->checkpointVer); - // update only it is in checkpoint status. - if (pStatus.state == TASK_STATUS__CK) { + // update only it is in checkpoint status, or during restore procedure. + if (pStatus.state == TASK_STATUS__CK || (!restored)) { pInfo->checkpointId = pReq->checkpointId; pInfo->checkpointVer = pReq->checkpointVer; pInfo->checkpointTime = pReq->checkpointTs; - code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); + if (restored) { + code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); + } } streamTaskClearCheckInfo(pTask, true); diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 8379d904c2..7c6461b1c8 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -891,24 +891,28 @@ int32_t streamMetaBegin(SStreamMeta* pMeta) { } int32_t streamMetaCommit(SStreamMeta* pMeta) { - if (tdbCommit(pMeta->db, pMeta->txn) < 0) { + int32_t code = 0; + code = tdbCommit(pMeta->db, pMeta->txn); + if (code != 0) { stError("vgId:%d failed to commit stream meta", pMeta->vgId); - return -1; + return code; } - if (tdbPostCommit(pMeta->db, pMeta->txn) < 0) { + code = tdbPostCommit(pMeta->db, pMeta->txn); + if (code != 0) { stError("vgId:%d failed to do post-commit stream meta", pMeta->vgId); - return -1; + return code; } - if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, - TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { + code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, + TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED); + if (code != 0) { stError("vgId:%d failed to begin trans", pMeta->vgId); - return -1; + return code; } stDebug("vgId:%d stream meta file commit completed", pMeta->vgId); - return 0; + return code; } int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {