fix(stream): register checkpoint trans.

This commit is contained in:
Haojun Liao 2023-11-15 14:29:01 +08:00
parent 51897a679f
commit f299bdb387
1 changed files with 3 additions and 1 deletions

View File

@ -1262,6 +1262,8 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
mndTransSetDbName(pTrans, pDb, pDb); mndTransSetDbName(pTrans, pDb, pDb);
taosMemoryFree((void *)pDb); taosMemoryFree((void *)pDb);
mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pDb, pDb);
if (mndTransCheckConflict(pMnode, pTrans) != 0) { if (mndTransCheckConflict(pMnode, pTrans) != 0) {
mError("failed to trigger checkpoint, checkpointId: %" PRId64 ", reason:%s", checkpointId, mError("failed to trigger checkpoint, checkpointId: %" PRId64 ", reason:%s", checkpointId,
tstrerror(TSDB_CODE_MND_TRANS_CONFLICT)); tstrerror(TSDB_CODE_MND_TRANS_CONFLICT));
@ -1815,7 +1817,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->sourceDb, pStream->targetDb); int32_t code = mndStreamRegisterTrans(pTrans, MND_STREAM_PAUSE_NAME, pStream->sourceDb, pStream->targetDb);
// pause all tasks // if nodeUpdate happened, not send pause trans
if (mndPauseAllStreamTasks(pMnode, pTrans, pStream) < 0) { if (mndPauseAllStreamTasks(pMnode, pTrans, pStream) < 0) {
mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr()); mError("stream:%s, failed to pause task since %s", pauseReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);