diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 38b669a8b1..f220d65f84 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -570,6 +570,7 @@ void streamMetaInit(); void streamMetaCleanup(); SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId); void streamMetaClose(SStreamMeta* streamMeta); +int32_t streamDoCheckpoint(SStreamMeta* streamMeta); int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8e696e5484..eda1f0c8bd 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1512,6 +1512,7 @@ int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* pMsg, int char* msg = POINTER_SHIFT(pMsg, sizeof(SMsgHead)); int32_t len = msgLen - sizeof(SMsgHead); + streamDoCheckpoint(pMeta); taosWLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock); return 0; diff --git a/source/libs/stream/src/streamCheckpoint.c b/source/libs/stream/src/streamCheckpoint.c index efd19074da..9294e8da06 100644 --- a/source/libs/stream/src/streamCheckpoint.c +++ b/source/libs/stream/src/streamCheckpoint.c @@ -130,15 +130,15 @@ static int32_t streamAlignCheckpoint(SStreamTask* pTask, int64_t checkpointId, i return atomic_sub_fetch_32(&pTask->checkpointAlignCnt, 1); } -static int32_t streamDoCheckpoint(SStreamMeta* pMeta, SStreamTask* pTask, int64_t checkpointId) { - // commit tdb state - streamStateCommit(pTask->pState); - // commit non-tdb state - // copy and save new state - // report to mnode - // send checkpoint req to downstream - return 0; -} +// static int32_t streamDoCheckpoint(SStreamMeta* pMeta, SStreamTask* pTask, int64_t checkpointId) { +// // commit tdb state +// streamStateCommit(pTask->pState); +// // commit non-tdb state +// // copy and save new state +// // report to mnode +// // send checkpoint req to downstream +// return 0; +// } static int32_t streamDoSourceCheckpoint(SStreamMeta* pMeta, SStreamTask* pTask, int64_t checkpointId) { // ref wal @@ -175,7 +175,7 @@ int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStre } } - code = streamDoCheckpoint(pMeta, pTask, checkpointId); + // code = streamDoCheckpoint(pMeta, pTask, checkpointId); if (code < 0) { // rsp error return -1;