add trigger checkpoint

This commit is contained in:
yihaoDeng 2023-06-09 12:12:51 +00:00
parent 74797f5ba6
commit a2030dc378
3 changed files with 12 additions and 10 deletions

View File

@ -570,6 +570,7 @@ void streamMetaInit();
void streamMetaCleanup(); void streamMetaCleanup();
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId); SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc, int32_t vgId);
void streamMetaClose(SStreamMeta* streamMeta); void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamDoCheckpoint(SStreamMeta* streamMeta);
int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaSaveTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask); int32_t streamMetaAddDeployedTask(SStreamMeta* pMeta, int64_t ver, SStreamTask* pTask);

View File

@ -1512,6 +1512,7 @@ int32_t tqProcessStreamCheckPointReq(STQ* pTq, int64_t sversion, char* pMsg, int
char* msg = POINTER_SHIFT(pMsg, sizeof(SMsgHead)); char* msg = POINTER_SHIFT(pMsg, sizeof(SMsgHead));
int32_t len = msgLen - sizeof(SMsgHead); int32_t len = msgLen - sizeof(SMsgHead);
streamDoCheckpoint(pMeta);
taosWLockLatch(&pMeta->lock); taosWLockLatch(&pMeta->lock);
taosWUnLockLatch(&pMeta->lock); taosWUnLockLatch(&pMeta->lock);
return 0; return 0;

View File

@ -130,15 +130,15 @@ static int32_t streamAlignCheckpoint(SStreamTask* pTask, int64_t checkpointId, i
return atomic_sub_fetch_32(&pTask->checkpointAlignCnt, 1); return atomic_sub_fetch_32(&pTask->checkpointAlignCnt, 1);
} }
static int32_t streamDoCheckpoint(SStreamMeta* pMeta, SStreamTask* pTask, int64_t checkpointId) { // static int32_t streamDoCheckpoint(SStreamMeta* pMeta, SStreamTask* pTask, int64_t checkpointId) {
// commit tdb state // // commit tdb state
streamStateCommit(pTask->pState); // streamStateCommit(pTask->pState);
// commit non-tdb state // // commit non-tdb state
// copy and save new state // // copy and save new state
// report to mnode // // report to mnode
// send checkpoint req to downstream // // send checkpoint req to downstream
return 0; // return 0;
} // }
static int32_t streamDoSourceCheckpoint(SStreamMeta* pMeta, SStreamTask* pTask, int64_t checkpointId) { static int32_t streamDoSourceCheckpoint(SStreamMeta* pMeta, SStreamTask* pTask, int64_t checkpointId) {
// ref wal // ref wal
@ -175,7 +175,7 @@ int32_t streamProcessCheckpointReq(SStreamMeta* pMeta, SStreamTask* pTask, SStre
} }
} }
code = streamDoCheckpoint(pMeta, pTask, checkpointId); // code = streamDoCheckpoint(pMeta, pTask, checkpointId);
if (code < 0) { if (code < 0) {
// rsp error // rsp error
return -1; return -1;