fix(stream): fix deadlock.

This commit is contained in:
Haojun Liao 2023-11-24 22:53:55 +08:00
parent 3deab3cfbf
commit 788194c748
3 changed files with 30 additions and 21 deletions

View File

@ -52,7 +52,7 @@ int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb);
int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream); int32_t mndPersistStream(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream);
int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pName, const char* pSrcDb, const char* pDstDb); int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pName, const char* pSrcDb, const char* pDstDb);
bool streamTransConflictOtherTrans(SMnode *pMnode, const char *pSrcDb, const char *pDstDb); bool streamTransConflictOtherTrans(SMnode *pMnode, const char *pSrcDb, const char *pDstDb, bool lock);
// for sma // for sma
// TODO refactor // TODO refactor

View File

@ -1348,7 +1348,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
} }
// check if it is conflict with other trans in both sourceDb and targetDb. // check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, true);
if (conflict) { if (conflict) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
tFreeMDropStreamReq(&dropReq); tFreeMDropStreamReq(&dropReq);
@ -1831,7 +1831,7 @@ static int32_t mndProcessPauseStreamReq(SRpcMsg *pReq) {
} }
// check if it is conflict with other trans in both sourceDb and targetDb. // check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, true);
if (conflict) { if (conflict) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
return -1; return -1;
@ -1966,7 +1966,7 @@ static int32_t mndProcessResumeStreamReq(SRpcMsg *pReq) {
} }
// check if it is conflict with other trans in both sourceDb and targetDb. // check if it is conflict with other trans in both sourceDb and targetDb.
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, true);
if (conflict) { if (conflict) {
sdbRelease(pMnode->pSdb, pStream); sdbRelease(pMnode->pSdb, pStream);
return -1; return -1;
@ -2754,7 +2754,7 @@ static int32_t mndResetStatusFromCheckpoint(SMnode *pMnode, int32_t transId) {
break; break;
} }
bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb); bool conflict = streamTransConflictOtherTrans(pMnode, pStream->sourceDb, pStream->targetDb, false);
if (conflict) { if (conflict) {
mError("stream:%s other trans exists in DB:%s & %s failed to start reset-status trans", mError("stream:%s other trans exists in DB:%s & %s failed to start reset-status trans",
pStream->name, pStream->sourceDb, pStream->targetDb); pStream->name, pStream->sourceDb, pStream->targetDb);

View File

@ -35,17 +35,15 @@ int32_t mndStreamRegisterTrans(STrans* pTrans, const char* pName, const char* pS
} }
int32_t clearFinishedTrans(SMnode* pMnode) { int32_t clearFinishedTrans(SMnode* pMnode) {
SArray* pList = taosArrayInit(4, sizeof(SKeyInfo));
size_t keyLen = 0; size_t keyLen = 0;
SArray* pList = taosArrayInit(4, sizeof(SKeyInfo));
taosThreadMutexLock(&execInfo.lock);
void* pIter = NULL; void* pIter = NULL;
while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) { while ((pIter = taosHashIterate(execInfo.transMgmt.pDBTrans, pIter)) != NULL) {
SStreamTransInfo* pEntry = (SStreamTransInfo*)pIter; SStreamTransInfo* pEntry = (SStreamTransInfo*)pIter;
STrans* pTrans = mndAcquireTrans(pMnode, pEntry->transId);
// let's clear the finished trans // let's clear the finished trans
STrans* pTrans = mndAcquireTrans(pMnode, pEntry->transId);
if (pTrans == NULL) { if (pTrans == NULL) {
void* pKey = taosHashGetKey(pEntry, &keyLen); void* pKey = taosHashGetKey(pEntry, &keyLen);
// key is the name of src/dst db name // key is the name of src/dst db name
@ -66,38 +64,49 @@ int32_t clearFinishedTrans(SMnode* pMnode) {
} }
mDebug("clear %d finished stream-trans, remained:%d", (int32_t)num, taosHashGetSize(execInfo.transMgmt.pDBTrans)); mDebug("clear %d finished stream-trans, remained:%d", (int32_t)num, taosHashGetSize(execInfo.transMgmt.pDBTrans));
taosThreadMutexUnlock(&execInfo.lock);
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
taosArrayDestroy(pList); taosArrayDestroy(pList);
return 0; return 0;
} }
bool streamTransConflictOtherTrans(SMnode* pMnode, const char* pSrcDb, const char* pDstDb) { bool streamTransConflictOtherTrans(SMnode* pMnode, const char* pSrcDb, const char* pDstDb, bool lock) {
clearFinishedTrans(pMnode); if (lock) {
taosThreadMutexLock(&execInfo.lock); taosThreadMutexLock(&execInfo.lock);
}
int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans); int32_t num = taosHashGetSize(execInfo.transMgmt.pDBTrans);
if (num <= 0) { if (num <= 0) {
if (lock) {
taosThreadMutexUnlock(&execInfo.lock); taosThreadMutexUnlock(&execInfo.lock);
}
return false; return false;
} }
clearFinishedTrans(pMnode);
SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pSrcDb, strlen(pSrcDb)); SStreamTransInfo *pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pSrcDb, strlen(pSrcDb));
if (pEntry != NULL) { if (pEntry != NULL) {
if (lock) {
taosThreadMutexUnlock(&execInfo.lock); taosThreadMutexUnlock(&execInfo.lock);
}
mWarn("conflict with other transId:%d in Db:%s, trans:%s", pEntry->transId, pSrcDb, pEntry->name); mWarn("conflict with other transId:%d in Db:%s, trans:%s", pEntry->transId, pSrcDb, pEntry->name);
return true; return true;
} }
pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pDstDb, strlen(pDstDb)); pEntry = taosHashGet(execInfo.transMgmt.pDBTrans, pDstDb, strlen(pDstDb));
if (pEntry != NULL) { if (pEntry != NULL) {
if (lock) {
taosThreadMutexUnlock(&execInfo.lock); taosThreadMutexUnlock(&execInfo.lock);
}
mWarn("conflict with other transId:%d in Db:%s, trans:%s", pEntry->transId, pSrcDb, pEntry->name); mWarn("conflict with other transId:%d in Db:%s, trans:%s", pEntry->transId, pSrcDb, pEntry->name);
return true; return true;
} }
if (lock) {
taosThreadMutexUnlock(&execInfo.lock); taosThreadMutexUnlock(&execInfo.lock);
}
return false; return false;
} }