fix(stream): fix error in check trans conflict.

This commit is contained in:
Haojun Liao 2023-11-22 16:20:59 +08:00
parent f3355000a3
commit ff9c0c0df4
2 changed files with 19 additions and 41 deletions

View File

@ -3182,7 +3182,7 @@ typedef struct {
int32_t tSerializeSMDropStreamReq(void* buf, int32_t bufLen, const SMDropStreamReq* pReq);
int32_t tDeserializeSMDropStreamReq(void* buf, int32_t bufLen, SMDropStreamReq* pReq);
void tFreeSMDropStreamReq(SMDropStreamReq* pReq);
void tFreeMDropStreamReq(SMDropStreamReq* pReq);
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];

View File

@ -132,8 +132,11 @@ int32_t mndInitStream(SMnode *pMnode) {
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_STREAM_TASKS, mndCancelGetNextStreamTask);
taosThreadMutexInit(&execInfo.lock, NULL);
execInfo.pTaskMap = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, HASH_NO_LOCK);
_hash_fn_t fn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
execInfo.pTaskList = taosArrayInit(4, sizeof(STaskId));
execInfo.pTaskMap = taosHashInit(64, fn, true, HASH_NO_LOCK);
execInfo.transMgmt.pDBTrans = taosHashInit(32, fn, true, HASH_NO_LOCK);
return sdbSetTable(pMnode->pSdb, table);
}
@ -719,24 +722,6 @@ int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream)
return 0;
}
static bool checkDbPrecision(SMnode* pMnode, SStreamObj* pStreamObj) {
if (pStreamObj->sourceDbUid != pStreamObj->targetDbUid) {
SDbObj *pSrcDb = mndAcquireDb(pMnode, pStreamObj->sourceDb);
SDbObj *pDstDb = mndAcquireDb(pMnode, pStreamObj->targetDb);
bool isIdentical = (pSrcDb->cfg.precision != pDstDb->cfg.precision);
mndReleaseDb(pMnode, pSrcDb);
mndReleaseDb(pMnode, pDstDb);
if (!isIdentical) {
mError("stream:%s failed to create since target/source db precision not identical", pStreamObj->name);
return false;
}
}
return true;
}
static int32_t checkForNumOfStreams(SMnode *pMnode, SStreamObj *pStreamObj) { // check for number of existed tasks
int32_t numOfStream = 0;
SStreamObj *pStream = NULL;
@ -809,12 +794,6 @@ static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq) {
goto _OVER;
}
// check if the time precision for source&target DB is identical.
bool isIdentical = checkDbPrecision(pMnode, &streamObj);
if (!isIdentical) {
goto _OVER;
}
code = checkForNumOfStreams(pMnode, &streamObj);
if (code != TSDB_CODE_SUCCESS) {
goto _OVER;
@ -896,7 +875,6 @@ _OVER:
}
mndReleaseStream(pMnode, pStream);
tFreeSCMCreateStreamReq(&createStreamReq);
tFreeStreamObj(&streamObj);
return code;
@ -1278,9 +1256,8 @@ static int32_t mndProcessStreamDoCheckpoint(SRpcMsg *pReq) {
const char *pDb = mndGetStreamDB(pMnode);
mndTransSetDbName(pTrans, pDb, pDb);
taosMemoryFree((void *)pDb);
mndStreamRegisterTrans(pTrans, MND_STREAM_CHECKPOINT_NAME, pDb, pDb);
taosMemoryFree((void *)pDb);
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
mError("failed to trigger checkpoint, checkpointId: %" PRId64 ", reason:%s", checkpointId,
@ -1331,26 +1308,27 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
if (dropReq.igNotExists) {
mInfo("stream:%s not exist, ignore not exist is set", dropReq.name);
sdbRelease(pMnode->pSdb, pStream);
tFreeSMDropStreamReq(&dropReq);
tFreeMDropStreamReq(&dropReq);
return 0;
} else {
terrno = TSDB_CODE_MND_STREAM_NOT_EXIST;
mError("stream:%s not exist failed to drop", dropReq.name);
tFreeSMDropStreamReq(&dropReq);
tFreeMDropStreamReq(&dropReq);
return -1;
}
}
if (mndCheckDbPrivilegeByName(pMnode, pReq->info.conn.user, MND_OPER_WRITE_DB, pStream->targetDb) != 0) {
sdbRelease(pMnode->pSdb, pStream);
tFreeSMDropStreamReq(&dropReq);
tFreeMDropStreamReq(&dropReq);
return -1;
}
// check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb);
if (!conflict) {
if (conflict) {
sdbRelease(pMnode->pSdb, pStream);
tFreeMDropStreamReq(&dropReq);
return -1;
}
@ -1358,7 +1336,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
if (pTrans == NULL) {
mError("stream:%s, failed to drop since %s", dropReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
tFreeSMDropStreamReq(&dropReq);
tFreeMDropStreamReq(&dropReq);
return -1;
}
@ -1368,7 +1346,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
tFreeSMDropStreamReq(&dropReq);
tFreeMDropStreamReq(&dropReq);
return -1;
}
@ -1379,7 +1357,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
mError("stream:%s, failed to drop task since %s", dropReq.name, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
tFreeSMDropStreamReq(&dropReq);
tFreeMDropStreamReq(&dropReq);
return -1;
}
@ -1387,7 +1365,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
if (mndPersistDropStreamLog(pMnode, pTrans, pStream) < 0) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
tFreeSMDropStreamReq(&dropReq);
tFreeMDropStreamReq(&dropReq);
return -1;
}
@ -1395,7 +1373,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
mError("trans:%d, failed to prepare drop stream trans since %s", pTrans->id, terrstr());
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
tFreeSMDropStreamReq(&dropReq);
tFreeMDropStreamReq(&dropReq);
return -1;
}
@ -1408,7 +1386,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
sdbRelease(pMnode->pSdb, pStream);
mndTransDrop(pTrans);
tFreeSMDropStreamReq(&dropReq);
tFreeMDropStreamReq(&dropReq);
return TSDB_CODE_ACTION_IN_PROGRESS;
}
@ -1826,7 +1804,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
// check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb);
if (!conflict) {
if (conflict) {
sdbRelease(pMnode->pSdb, pStream);
return -1;
}
@ -1961,7 +1939,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
// check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb);
if (!conflict) {
if (conflict) {
sdbRelease(pMnode->pSdb, pStream);
return -1;
}