refactor: do some internal refactor.

This commit is contained in:
Haojun Liao 2024-08-15 18:29:36 +08:00
parent 5322b60a31
commit c94cd24593
2 changed files with 21 additions and 13 deletions

View File

@ -561,12 +561,14 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d", stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d",
id, vgId, pReq->taskId, numOfTasks); id, vgId, pReq->taskId, numOfTasks);
} }
streamMetaWLock(pMeta); streamMetaWLock(pMeta);
if (streamMetaCommit(pMeta) < 0) { if (pReq->dropRelHTask) {
// persist to disk code = streamMetaCommit(pMeta);
} }
} }
// always return true
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
@ -594,13 +596,15 @@ int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SV
ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer && ASSERT(pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer &&
pInfo->processedVer <= pReq->checkpointVer); pInfo->processedVer <= pReq->checkpointVer);
// update only it is in checkpoint status. // update only it is in checkpoint status, or during restore procedure.
if (pStatus.state == TASK_STATUS__CK) { if (pStatus.state == TASK_STATUS__CK || (!restored)) {
pInfo->checkpointId = pReq->checkpointId; pInfo->checkpointId = pReq->checkpointId;
pInfo->checkpointVer = pReq->checkpointVer; pInfo->checkpointVer = pReq->checkpointVer;
pInfo->checkpointTime = pReq->checkpointTs; pInfo->checkpointTime = pReq->checkpointTs;
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE); if (restored) {
code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
}
} }
streamTaskClearCheckInfo(pTask, true); streamTaskClearCheckInfo(pTask, true);

View File

@ -891,24 +891,28 @@ int32_t streamMetaBegin(SStreamMeta* pMeta) {
} }
int32_t streamMetaCommit(SStreamMeta* pMeta) { int32_t streamMetaCommit(SStreamMeta* pMeta) {
if (tdbCommit(pMeta->db, pMeta->txn) < 0) { int32_t code = 0;
code = tdbCommit(pMeta->db, pMeta->txn);
if (code != 0) {
stError("vgId:%d failed to commit stream meta", pMeta->vgId); stError("vgId:%d failed to commit stream meta", pMeta->vgId);
return -1; return code;
} }
if (tdbPostCommit(pMeta->db, pMeta->txn) < 0) { code = tdbPostCommit(pMeta->db, pMeta->txn);
if (code != 0) {
stError("vgId:%d failed to do post-commit stream meta", pMeta->vgId); stError("vgId:%d failed to do post-commit stream meta", pMeta->vgId);
return -1; return code;
} }
if (tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL, code = tdbBegin(pMeta->db, &pMeta->txn, tdbDefaultMalloc, tdbDefaultFree, NULL,
TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < 0) { TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED);
if (code != 0) {
stError("vgId:%d failed to begin trans", pMeta->vgId); stError("vgId:%d failed to begin trans", pMeta->vgId);
return -1; return code;
} }
stDebug("vgId:%d stream meta file commit completed", pMeta->vgId); stDebug("vgId:%d stream meta file commit completed", pMeta->vgId);
return 0; return code;
} }
int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) { int64_t streamMetaGetLatestCheckpointId(SStreamMeta* pMeta) {