From ff9c0c0df4515e45615c1942807d20f7d3e12f1f Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Wed, 22 Nov 2023 16:20:59 +0800 Subject: [PATCH] fix(stream): fix error in check trans conflict. --- include/common/tmsg.h | 2 +- source/dnode/mnode/impl/src/mndStream.c | 58 ++++++++----------------- 2 files changed, 19 insertions(+), 41 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d08b424e9c..328a1e383c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -3182,7 +3182,7 @@ typedef struct { int32_t tSerializeSMDropStreamReq(void* buf, int32_t bufLen, const SMDropStreamReq* pReq); int32_t tDeserializeSMDropStreamReq(void* buf, int32_t bufLen, SMDropStreamReq* pReq); -void tFreeSMDropStreamReq(SMDropStreamReq* pReq); +void tFreeMDropStreamReq(SMDropStreamReq* pReq); typedef struct { char name[TSDB_STREAM_FNAME_LEN]; diff --git a/source/dnode/mnode/impl/src/mndStream.c b/source/dnode/mnode/impl/src/mndStream.c index 6755204622..21bf8b8dc5 100644 --- a/source/dnode/mnode/impl/src/mndStream.c +++ b/source/dnode/mnode/impl/src/mndStream.c @@ -132,8 +132,11 @@ int32_t mndInitStream(SMnode *pMnode) { mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask); taosThreadMutexInit(&execInfo.lock, NULL); - execInfo.pTaskMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK); + _hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR); + execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId)); + execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK); + execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK); return sdbSetTable(pMnode->pSdb, table); } @@ -719,24 +722,6 @@ int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) return 0; } -static bool checkDbPrecision(SMnode* pMnode, SStreamObj* pStreamObj) { - if (pStreamObj->sourceDbUid != pStreamObj->targetDbUid) { - SDbObj *pSrcDb = mndAcquireDb(pMnode, pStreamObj->sourceDb); - SDbObj *pDstDb = mndAcquireDb(pMnode, pStreamObj->targetDb); - - bool isIdentical = (pSrcDb->cfg.precision != pDstDb->cfg.precision); - mndReleaseDb(pMnode, pSrcDb); - mndReleaseDb(pMnode, pDstDb); - - if (!isIdentical) { - mError("stream:%s failed to create since target/source db precision not identical", pStreamObj->name); - return false; - } - } - - return true; -} - static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { // check for number of existed tasks int32_t numOfStream = 0; SStreamObj *pStream = NULL; @@ -809,12 +794,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) { goto _OVER; } - // check if the time precision for source&target DB is identical. - bool isIdentical = checkDbPrecision(pMnode, &streamObj); - if (!isIdentical) { - goto _OVER; - } - code = checkForNumOfStreams(pMnode, &streamObj); if (code != TSDB_CODE_SUCCESS) { goto _OVER; @@ -896,7 +875,6 @@ _OVER: } mndReleaseStream(pMnode, pStream); - tFreeSCMCreateStreamReq(&createStreamReq); tFreeStreamObj(&streamObj); return code; @@ -1278,9 +1256,8 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) { const char *pDb = mndGetStreamDB(pMnode); mndTransSetDbName(pTrans, pDb, pDb); - taosMemoryFree((void *)pDb); - mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pDb, pDb); + taosMemoryFree((void *)pDb); if (mndTransCheckConflict(pMnode, pTrans) != 0) { mError("failed to trigger checkpoint, checkpointId: %" PRId64 ", reason:%s", checkpointId, @@ -1331,26 +1308,27 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { if (dropReq.igNotExists) { mInfo("stream:%s not exist, ignore not exist is set", dropReq.name); sdbRelease(pMnode->pSdb, pStream); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return 0; } else { terrno = TSDB_CODE_MND_STREAM_NOT_EXIST; mError("stream:%s not exist failed to drop", dropReq.name); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return -1; } } if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) { sdbRelease(pMnode->pSdb, pStream); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return -1; } // check if it is conflict with other trans in both sourceDb and targetDb. bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); - if (!conflict) { + if (conflict) { sdbRelease(pMnode->pSdb, pStream); + tFreeMDropStreamReq(&dropReq); return -1; } @@ -1358,7 +1336,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { if (pTrans == NULL) { mError("stream:%s, failed to drop since %s", dropReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return -1; } @@ -1368,7 +1346,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { if (mndTransCheckConflict(pMnode, pTrans) != 0) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return -1; } @@ -1379,7 +1357,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return -1; } @@ -1387,7 +1365,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return -1; } @@ -1395,7 +1373,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr()); sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return -1; } @@ -1408,7 +1386,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { sdbRelease(pMnode->pSdb, pStream); mndTransDrop(pTrans); - tFreeSMDropStreamReq(&dropReq); + tFreeMDropStreamReq(&dropReq); return TSDB_CODE_ACTION_IN_PROGRESS; } @@ -1826,7 +1804,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) { // check if it is conflict with other trans in both sourceDb and targetDb. bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); - if (!conflict) { + if (conflict) { sdbRelease(pMnode->pSdb, pStream); return -1; } @@ -1961,7 +1939,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) { // check if it is conflict with other trans in both sourceDb and targetDb. bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); - if (!conflict) { + if (conflict) { sdbRelease(pMnode->pSdb, pStream); return -1; }