diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 86d34502c6..e17a72992c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3112,7 +3112,7 @@ typedef struct { int32_t tSerializeSMDropStreamReq(void* buf, int32_t bufLen, const SMDropStreamReq* pReq); int32_t tDeserializeSMDropStreamReq(void* buf, int32_t bufLen, SMDropStreamReq* pReq); -void tFreeSMDropStreamReq(SMDropStreamReq* pReq); +void tFreeMDropStreamReq(SMDropStreamReq* pReq); typedef struct { char name[TSDB_STREAM_FNAME_LEN]; diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index dc3ba7934f..01b1df9d5f 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -7152,7 +7152,7 @@ int32_t tDeserializeSMDropStreamReq(void *buf, int32_t bufLen, SMDropStreamReq * return 0; } -void tFreeSMDropStreamReq(SMDropStreamReq *pReq) { +void tFreeMDropStreamReq(SMDropStreamReq *pReq) { FREESQL(); } diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 3b0559741e..8ccda66212 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -922,7 +922,6 @@ _OVER: } mndReleaseStream(pMnode, pStream); - tFreeSCMCreateStreamReq(&createStreamReq); tFreeStreamObj(&streamObj); if (sql != NULL) { @@ -1362,26 +1361,27 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { if (dropReq.igNotExists) { mInfo("stream:%s not exist, ignore not exist is set", dropReq.name); sdbRelease(pMnode->pSdb, pStream); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return 0; } else { terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; mError("stream:%s not exist failed to drop", dropReq.name); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return -1; } } if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) { sdbRelease(pMnode->pSdb, pStream); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return -1; } // check if it is conflict with other trans in both sourceDb and targetDb. bool conflict = mndStreamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); - if (!conflict) { + if (conflict) { sdbRelease(pMnode->pSdb, pStream); + tFreeMDropStreamReq(&dropReq); return -1; } @@ -1389,7 +1389,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { if (pTrans == NULL) { mError("stream:%s, failed to drop since %s", dropReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return -1; } @@ -1399,7 +1399,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { if (mndTransCheckConflict(pMnode, pTrans) != 0) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return -1; } @@ -1410,7 +1410,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return -1; } @@ -1418,7 +1418,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return -1; } @@ -1426,7 +1426,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return -1; } @@ -1439,7 +1439,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return TSDB_CODE_ACTION_IN_PROGRESS; } @@ -1857,7 +1857,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { // check if it is conflict with other trans in both sourceDb and targetDb. bool conflict = mndStreamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); - if (!conflict) { + if (conflict) { sdbRelease(pMnode->pSdb, pStream); return -1; } @@ -1992,7 +1992,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { // check if it is conflict with other trans in both sourceDb and targetDb. bool conflict = mndStreamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); - if (!conflict) { + if (conflict) { sdbRelease(pMnode->pSdb, pStream); return -1; } diff --git a/source/dnode/mnode/impl/src/mndStreamTrans.c b/source/dnode/mnode/impl/src/mndStreamTrans.c index cf48469630..3e2afe4ade 100644 --- a/source/dnode/mnode/impl/src/mndStreamTrans.c +++ b/source/dnode/mnode/impl/src/mndStreamTrans.c @@ -86,12 +86,14 @@ bool mndStreamTransConflictOtherTrans(SMnode* pMnode, const char* pSrcDb, const SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pSrcDb, strlen(pSrcDb)); if (pEntry != NULL) { taosThreadMutexUnlock(&execInfo.lock); + mWarn("conflict with other transId:%d in Db:%s, trans:%s", pEntry->transId, pSrcDb, pEntry->name); return true; } pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pDstDb, strlen(pDstDb)); if (pEntry != NULL) { taosThreadMutexUnlock(&execInfo.lock); + mWarn("conflict with other transId:%d in Db:%s, trans:%s", pEntry->transId, pSrcDb, pEntry->name); return true; } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 1a65a29259..99c1176197 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -7958,7 +7958,7 @@ static int32_t translateDropStream(STranslateContext* pCxt, SDropStreamStmt* pSt tNameGetFullDbName(&name, dropReq.name); dropReq.igNotExists = pStmt->ignoreNotExists; int32_t code = buildCmdMsg(pCxt, TDMT_MND_DROP_STREAM, (FSerializeFunc)tSerializeSMDropStreamReq, &dropReq); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return code; } diff --git a/tests/script/tsim/stream/pauseAndResume.sim b/tests/script/tsim/stream/pauseAndResume.sim index 673bc77c0f..5eb9eef010 100644 --- a/tests/script/tsim/stream/pauseAndResume.sim +++ b/tests/script/tsim/stream/pauseAndResume.sim @@ -16,9 +16,8 @@ sql create table ts2 using st tags(2,2,2); sql create table ts3 using st tags(3,2,2); sql create table ts4 using st tags(4,2,2); sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt1 as select _wstart, count(*) c1, sum(a) c3 from st interval(10s); -sleep 1000 +sleep 2000 -sleep 1000 sql pause stream streams1; sql insert into ts1 values(1648791213001,1,12,3,1.0);