fix(stream): fix the invalid check of trans conflict.
This commit is contained in:
commit
9988720a1e
|
@ -3112,7 +3112,7 @@ typedef struct {
|
||||||
|
|
||||||
int32_t tSerializeSMDropStreamReq(void* buf, int32_t bufLen, const SMDropStreamReq* pReq);
|
int32_t tSerializeSMDropStreamReq(void* buf, int32_t bufLen, const SMDropStreamReq* pReq);
|
||||||
int32_t tDeserializeSMDropStreamReq(void* buf, int32_t bufLen, SMDropStreamReq* pReq);
|
int32_t tDeserializeSMDropStreamReq(void* buf, int32_t bufLen, SMDropStreamReq* pReq);
|
||||||
void tFreeSMDropStreamReq(SMDropStreamReq* pReq);
|
void tFreeMDropStreamReq(SMDropStreamReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_STREAM_FNAME_LEN];
|
char name[TSDB_STREAM_FNAME_LEN];
|
||||||
|
|
|
@ -7152,7 +7152,7 @@ int32_t tDeserializeSMDropStreamReq(void *buf, int32_t bufLen, SMDropStreamReq *
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void tFreeSMDropStreamReq(SMDropStreamReq *pReq) {
|
void tFreeMDropStreamReq(SMDropStreamReq *pReq) {
|
||||||
FREESQL();
|
FREESQL();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -922,7 +922,6 @@ _OVER:
|
||||||
}
|
}
|
||||||
|
|
||||||
mndReleaseStream(pMnode, pStream);
|
mndReleaseStream(pMnode, pStream);
|
||||||
|
|
||||||
tFreeSCMCreateStreamReq(&createStreamReq);
|
tFreeSCMCreateStreamReq(&createStreamReq);
|
||||||
tFreeStreamObj(&streamObj);
|
tFreeStreamObj(&streamObj);
|
||||||
if (sql != NULL) {
|
if (sql != NULL) {
|
||||||
|
@ -1362,26 +1361,27 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
if (dropReq.igNotExists) {
|
if (dropReq.igNotExists) {
|
||||||
mInfo("stream:%s not exist, ignore not exist is set", dropReq.name);
|
mInfo("stream:%s not exist, ignore not exist is set", dropReq.name);
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
tFreeSMDropStreamReq(&dropReq);
|
tFreeMDropStreamReq(&dropReq);
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
|
||||||
mError("stream:%s not exist failed to drop", dropReq.name);
|
mError("stream:%s not exist failed to drop", dropReq.name);
|
||||||
tFreeSMDropStreamReq(&dropReq);
|
tFreeMDropStreamReq(&dropReq);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
|
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
tFreeSMDropStreamReq(&dropReq);
|
tFreeMDropStreamReq(&dropReq);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// check if it is conflict with other trans in both sourceDb and targetDb.
|
// check if it is conflict with other trans in both sourceDb and targetDb.
|
||||||
bool conflict = mndStreamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb);
|
bool conflict = mndStreamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb);
|
||||||
if (!conflict) {
|
if (conflict) {
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
|
tFreeMDropStreamReq(&dropReq);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1389,7 +1389,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
if (pTrans == NULL) {
|
if (pTrans == NULL) {
|
||||||
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
|
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
tFreeSMDropStreamReq(&dropReq);
|
tFreeMDropStreamReq(&dropReq);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1399,7 +1399,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
tFreeSMDropStreamReq(&dropReq);
|
tFreeMDropStreamReq(&dropReq);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1410,7 +1410,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr());
|
mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr());
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
tFreeSMDropStreamReq(&dropReq);
|
tFreeMDropStreamReq(&dropReq);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1418,7 +1418,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
|
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
tFreeSMDropStreamReq(&dropReq);
|
tFreeMDropStreamReq(&dropReq);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1426,7 +1426,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr());
|
mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr());
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
tFreeSMDropStreamReq(&dropReq);
|
tFreeMDropStreamReq(&dropReq);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1439,7 +1439,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
mndTransDrop(pTrans);
|
mndTransDrop(pTrans);
|
||||||
tFreeSMDropStreamReq(&dropReq);
|
tFreeMDropStreamReq(&dropReq);
|
||||||
|
|
||||||
return TSDB_CODE_ACTION_IN_PROGRESS;
|
return TSDB_CODE_ACTION_IN_PROGRESS;
|
||||||
}
|
}
|
||||||
|
@ -1857,7 +1857,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
// check if it is conflict with other trans in both sourceDb and targetDb.
|
// check if it is conflict with other trans in both sourceDb and targetDb.
|
||||||
bool conflict = mndStreamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb);
|
bool conflict = mndStreamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb);
|
||||||
if (!conflict) {
|
if (conflict) {
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -1992,7 +1992,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
|
||||||
|
|
||||||
// check if it is conflict with other trans in both sourceDb and targetDb.
|
// check if it is conflict with other trans in both sourceDb and targetDb.
|
||||||
bool conflict = mndStreamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb);
|
bool conflict = mndStreamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb);
|
||||||
if (!conflict) {
|
if (conflict) {
|
||||||
sdbRelease(pMnode->pSdb, pStream);
|
sdbRelease(pMnode->pSdb, pStream);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -86,12 +86,14 @@ bool mndStreamTransConflictOtherTrans(SMnode* pMnode, const char* pSrcDb, const
|
||||||
SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pSrcDb, strlen(pSrcDb));
|
SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pSrcDb, strlen(pSrcDb));
|
||||||
if (pEntry != NULL) {
|
if (pEntry != NULL) {
|
||||||
taosThreadMutexUnlock(&execInfo.lock);
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
|
mWarn("conflict with other transId:%d in Db:%s, trans:%s", pEntry->transId, pSrcDb, pEntry->name);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pDstDb, strlen(pDstDb));
|
pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pDstDb, strlen(pDstDb));
|
||||||
if (pEntry != NULL) {
|
if (pEntry != NULL) {
|
||||||
taosThreadMutexUnlock(&execInfo.lock);
|
taosThreadMutexUnlock(&execInfo.lock);
|
||||||
|
mWarn("conflict with other transId:%d in Db:%s, trans:%s", pEntry->transId, pSrcDb, pEntry->name);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -7958,7 +7958,7 @@ static int32_t translateDropStream(STranslateContext* pCxt, SDropStreamStmt* pSt
|
||||||
tNameGetFullDbName(&name, dropReq.name);
|
tNameGetFullDbName(&name, dropReq.name);
|
||||||
dropReq.igNotExists = pStmt->ignoreNotExists;
|
dropReq.igNotExists = pStmt->ignoreNotExists;
|
||||||
int32_t code = buildCmdMsg(pCxt, TDMT_MND_DROP_STREAM, (FSerializeFunc)tSerializeSMDropStreamReq, &dropReq);
|
int32_t code = buildCmdMsg(pCxt, TDMT_MND_DROP_STREAM, (FSerializeFunc)tSerializeSMDropStreamReq, &dropReq);
|
||||||
tFreeSMDropStreamReq(&dropReq);
|
tFreeMDropStreamReq(&dropReq);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -16,9 +16,8 @@ sql create table ts2 using st tags(2,2,2);
|
||||||
sql create table ts3 using st tags(3,2,2);
|
sql create table ts3 using st tags(3,2,2);
|
||||||
sql create table ts4 using st tags(4,2,2);
|
sql create table ts4 using st tags(4,2,2);
|
||||||
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt1 as select _wstart, count(*) c1, sum(a) c3 from st interval(10s);
|
sql create stream streams1 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 watermark 1d into streamt1 as select _wstart, count(*) c1, sum(a) c3 from st interval(10s);
|
||||||
sleep 1000
|
sleep 2000
|
||||||
|
|
||||||
sleep 1000
|
|
||||||
sql pause stream streams1;
|
sql pause stream streams1;
|
||||||
|
|
||||||
sql insert into ts1 values(1648791213001,1,12,3,1.0);
|
sql insert into ts1 values(1648791213001,1,12,3,1.0);
|
||||||
|
|
Loading…
Reference in New Issue